diff options
author | Martin Ritchie <ritchiem@apache.org> | 2006-09-27 09:09:42 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2006-09-27 09:09:42 +0000 |
commit | 1ba25d9231401f2f34ee41893d402e3cb2f299ed (patch) | |
tree | f0effe6896595dd38c0bbc60350b522c5416f480 | |
parent | 6ce702dfb4ea0e1835804efd328be2eee79e23b3 (diff) | |
download | qpid-python-1ba25d9231401f2f34ee41893d402e3cb2f299ed.tar.gz |
AMQProtocolSession.java - white space changes
BasicMessageProducer.java - white space changes
BasicMessageConsumer.java - white space changes
AMQSession.java - added a comment
MemoryMessageStore.java - white space changes
SubscriptionImpl.java AMQChannel.java - Removed race condition where two messages could get the same delivery tag and when using acks where messages can be added to the UnackMap out of sequence, Causing unknown message to ack exceptions.
DestNameExchange.java - white space/style changes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@450384 13f79535-47bb-0310-9956-ffa450edef68
8 files changed, 113 insertions, 73 deletions
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java index 8dc4626c46..d4226c42aa 100644 --- a/java/broker/src/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java @@ -23,26 +23,28 @@ import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.exchange.MessageRouter; +import org.apache.qpid.server.management.DefaultManagedObject; +import org.apache.qpid.server.management.Managable; +import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.TxnBuffer; import org.apache.qpid.server.txn.TxnOp; -import org.apache.qpid.server.management.Managable; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.management.DefaultManagedObject; -import javax.management.ObjectName; -import javax.management.MalformedObjectNameException; import javax.management.JMException; import javax.management.MBeanException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + public class AMQChannel implements Managable { @@ -62,7 +64,7 @@ public class AMQChannel implements Managable * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that * value of this represents the <b>last</b> tag sent out */ - private long _deliveryTag; + private AtomicLong _deliveryTag = new AtomicLong(0); /** * A channel has a default queue (the last declared) that is used when no queue name is @@ -74,7 +76,7 @@ public class AMQChannel implements Managable * This tag is unique per subscription to a queue. The server returns this in response to a * basic.consume request. */ - private int _consumerTag = 0; + private int _consumerTag; /** * The current message - which may be partial in the sense that not all frames have been received yet - @@ -150,7 +152,7 @@ public class AMQChannel implements Managable _txnBuffer.commit(); } } - catch(AMQException ex) + catch (AMQException ex) { throw new MBeanException(ex, ex.toString()); } @@ -160,13 +162,13 @@ public class AMQChannel implements Managable { if (_transactional) { - synchronized (_txnBuffer) + synchronized(_txnBuffer) { try { _txnBuffer.rollback(); } - catch(AMQException ex) + catch (AMQException ex) { throw new MBeanException(ex, ex.toString()); } @@ -201,7 +203,7 @@ public class AMQChannel implements Managable } public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges) - throws AMQException + throws AMQException { _channelId = channelId; _channelName = _channelId + "-" + this.hashCode(); @@ -300,7 +302,7 @@ public class AMQChannel implements Managable public long getNextDeliveryTag() { - return ++_deliveryTag; + return _deliveryTag.incrementAndGet(); } public int getNextConsumerTag() @@ -348,7 +350,7 @@ public class AMQChannel implements Managable else { throw new AMQException(_log, "Consumer tag " + consumerTag + " not known to channel " + - _channelId); + _channelId); } } @@ -361,7 +363,7 @@ public class AMQChannel implements Managable { if (_transactional) { - synchronized (_txnBuffer) + synchronized(_txnBuffer) { _txnBuffer.rollback();//releases messages } @@ -390,7 +392,7 @@ public class AMQChannel implements Managable */ public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue) { - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag)); checkSuspension(); @@ -405,7 +407,7 @@ public class AMQChannel implements Managable { // we must create a new map since all the messages will get a new delivery tag when they are redelivered Map<Long, UnacknowledgedMessage> currentList; - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { currentList = _unacknowledgedMessageMap; _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH); @@ -426,7 +428,7 @@ public class AMQChannel implements Managable public void resend(AMQProtocolSession session) { //messages go to this channel - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet()) { @@ -449,7 +451,7 @@ public class AMQChannel implements Managable */ public void queueDeleted(AMQQueue queue) { - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { for (Map.Entry<Long, UnacknowledgedMessage> unacked : _unacknowledgedMessageMap.entrySet()) { @@ -465,13 +467,25 @@ public class AMQChannel implements Managable catch (AMQException e) { _log.error("Error decrementing ref count on message " + unackedMsg.message.getMessageId() + ": " + - e, e); + e, e); } } } } } + public synchronized long prepareNewMessageForDelivery(boolean acks, AMQMessage msg, String consumerTag, AMQQueue queue) + { + long deliveryTag = getNextDeliveryTag(); + + if (acks) + { + addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + } + + return deliveryTag; + } + /** * Acknowledge one or more messages. * @@ -498,7 +512,7 @@ public class AMQChannel implements Managable if (multiple) { LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>(); - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { if (deliveryTag == 0) { @@ -514,10 +528,20 @@ public class AMQChannel implements Managable throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel"); } Iterator<Map.Entry<Long, UnacknowledgedMessage>> i = _unacknowledgedMessageMap.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry<Long, UnacknowledgedMessage> unacked = i.next(); + + if (unacked.getKey() > deliveryTag) + { + //This should not occur now. + throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + " When deliveryTag is:" + deliveryTag + "ES:" + _unacknowledgedMessageMap.entrySet().toString()); + } + i.remove(); + acked.add(unacked.getValue()); if (unacked.getKey() == deliveryTag) { @@ -525,11 +549,12 @@ public class AMQChannel implements Managable } } } - } + }// synchronized + if (_log.isDebugEnabled()) { _log.debug("Received multiple ack for delivery tag " + deliveryTag + ". Removing " + - acked.size() + " items."); + acked.size() + " items."); } for (UnacknowledgedMessage msg : acked) @@ -541,12 +566,14 @@ public class AMQChannel implements Managable else { UnacknowledgedMessage msg; - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { msg = _unacknowledgedMessageMap.remove(deliveryTag); } + if (msg == null) { + _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); } msg.discard(); @@ -573,7 +600,7 @@ public class AMQChannel implements Managable { boolean suspend; //noinspection SynchronizeOnNonFinalField - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { suspend = _unacknowledgedMessageMap.size() >= _prefetchCount; } @@ -614,7 +641,7 @@ public class AMQChannel implements Managable public void rollback() throws AMQException { //need to protect rollback and close from each other... - synchronized (_txnBuffer) + synchronized(_txnBuffer) { _txnBuffer.rollback(); } diff --git a/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java index 7f1c7df224..a703595cc4 100644 --- a/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java @@ -17,19 +17,21 @@ */ package org.apache.qpid.server.exchange; -import org.apache.qpid.server.queue.AMQQueue; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.log4j.Logger; -import javax.management.openmbean.*; -import javax.management.MBeanException; import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.openmbean.*; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.ArrayList; public class DestNameExchange extends AbstractExchange { @@ -117,12 +119,14 @@ public class DestNameExchange extends AbstractExchange } public void createBinding(String queueName, String binding) - throws JMException + throws JMException { AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName); if (queue == null) + { throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); + } try { @@ -147,7 +151,7 @@ public class DestNameExchange extends AbstractExchange { assert queue != null; assert routingKey != null; - if(!_index.add(routingKey, queue)) + if (!_index.add(routingKey, queue)) { _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey); } @@ -195,7 +199,7 @@ public class DestNameExchange extends AbstractExchange _logger.debug("Publishing message to queue " + queues); } - for(AMQQueue q :queues) + for (AMQQueue q : queues) { q.deliver(payload); } diff --git a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java index a3c2fab4f4..ef18f61070 100644 --- a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -19,12 +19,12 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.BasicDeliverBody; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.AMQException; /** * Encapsulation of a supscription to a queue. @@ -70,7 +70,8 @@ public class SubscriptionImpl implements Subscription throws AMQException { AMQChannel channel = protocolSession.getChannel(channelId); - if (channel == null) { + if (channel == null) + { throw new NullPointerException("channel not found in protocol session"); } @@ -99,8 +100,8 @@ public class SubscriptionImpl implements Subscription private boolean equals(SubscriptionImpl psc) { return sessionKey.equals(psc.sessionKey) - && psc.channel == channel - && psc.consumerTag.equals(consumerTag); + && psc.channel == channel + && psc.consumerTag.equals(consumerTag); } public int hashCode() @@ -113,18 +114,25 @@ public class SubscriptionImpl implements Subscription return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]"; } + /** + * This method can be called by each of the publisher threads. + * As a result all changes to the channel object must be thread safe. + * + * @param msg + * @param queue + * @throws AMQException + */ public void send(AMQMessage msg, AMQQueue queue) throws AMQException { if (msg != null) { - final long deliveryTag = channel.getNextDeliveryTag(); + long deliveryTag = channel.prepareNewMessageForDelivery(_acks,msg,consumerTag,queue); + ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); - if (_acks) - { - channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); - } + protocolSession.writeFrame(frame); + // if we do not need to wait for client acknowledgements we can decrement // the reference count immediately if (!_acks) diff --git a/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java index baa414ff19..8dd268e673 100644 --- a/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java @@ -17,21 +17,20 @@ */ package org.apache.qpid.server.store; +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.AMQException; -import org.apache.log4j.Logger; -import org.apache.commons.configuration.Configuration; -import java.util.concurrent.ConcurrentMap; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.List; /** * A simple message store that stores the messages in a threadsafe structure in memory. - * */ public class MemoryMessageStore implements MessageStore { @@ -48,7 +47,7 @@ public class MemoryMessageStore implements MessageStore public void configure() { _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash table"); - _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY); + _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY); } public void configure(String base, Configuration config) @@ -65,7 +64,7 @@ public class MemoryMessageStore implements MessageStore public void close() throws Exception { - if(_messageMap != null) + if (_messageMap != null) { _messageMap.clear(); _messageMap = null; diff --git a/java/client/src/org/apache/qpid/client/AMQSession.java b/java/client/src/org/apache/qpid/client/AMQSession.java index 2c59e5f809..4768399036 100644 --- a/java/client/src/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/org/apache/qpid/client/AMQSession.java @@ -402,6 +402,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi //need to send ack for messages delivered to consumers so far for(Iterator i = _consumers.values().iterator(); i.hasNext();) { + //Sends acknowledgement to server ((BasicMessageConsumer) i.next()).acknowledgeLastDelivered(); } diff --git a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java index 5d13a1cd41..b46c5f111d 100644 --- a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java @@ -382,9 +382,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliverBody.deliveryTag, - messageFrame.deliverBody.redelivered, - messageFrame.contentHeader, - messageFrame.bodies); + messageFrame.deliverBody.redelivered, + messageFrame.contentHeader, + messageFrame.bodies); _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); diff --git a/java/client/src/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/org/apache/qpid/client/BasicMessageProducer.java index a6bc7a0781..694a4a7863 100644 --- a/java/client/src/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/org/apache/qpid/client/BasicMessageProducer.java @@ -17,13 +17,13 @@ */ package org.apache.qpid.client; +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSBytesMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.*; -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -122,10 +122,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j void resubscribe() throws AMQException { - if (_destination != null) - { - declareDestination(_destination); - } + if (_destination != null) + { + declareDestination(_destination); + } } private void declareDestination(AMQDestination destination) @@ -330,16 +330,17 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j if (!(destination instanceof AMQDestination)) { throw new JMSException("Unsupported destination class: " + - (destination != null?destination.getClass():null)); + (destination != null ? destination.getClass() : null)); } declareDestination((AMQDestination)destination); } protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority, - long timeToLive, boolean mandatory, boolean immediate) throws JMSException + long timeToLive, boolean mandatory, boolean immediate) throws JMSException { sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent); } + /** * The caller of this method must hold the failover mutex. * @param destination @@ -352,7 +353,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j * @throws JMSException */ protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority, - long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException + long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException { AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(), destination.getRoutingKey(), mandatory, immediate); @@ -366,10 +367,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j // // Very nasty temporary hack for GRM-206. Will be altered ASAP. // - if(message instanceof JMSBytesMessage) + if (message instanceof JMSBytesMessage) { JMSBytesMessage msg = (JMSBytesMessage) message; - if(!msg.isReadable()) + if (!msg.isReadable()) { msg.reset(); } @@ -442,7 +443,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j int dataLength = payload.remaining(); final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0; - int frameCount = (int) (dataLength/framePayloadMax) + lastFrame; + int frameCount = (int) (dataLength / framePayloadMax) + lastFrame; final ContentBody[] bodies = new ContentBody[frameCount]; if (frameCount == 1) diff --git a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java index 77685a0222..b181490fdd 100644 --- a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -230,7 +230,7 @@ public class AMQProtocolSession implements ProtocolVersionList } if (msg.isAllBodyDataReceived()) { - deliverMessageToAMQSession(channelId, msg); + deliverMessageToAMQSession(channelId, msg); } } @@ -260,8 +260,8 @@ public class AMQProtocolSession implements ProtocolVersionList public void writeFrame(AMQDataBlock frame, boolean wait) { - WriteFuture f =_minaProtocolSession.write(frame); - if(wait) + WriteFuture f = _minaProtocolSession.write(frame); + if (wait) { f.join(); } @@ -269,7 +269,7 @@ public class AMQProtocolSession implements ProtocolVersionList public void addSessionByChannel(int channelId, AMQSession session) { - if (channelId <=0) + if (channelId <= 0) { throw new IllegalArgumentException("Attempt to register a session with a channel id <= zero"); } @@ -283,7 +283,7 @@ public class AMQProtocolSession implements ProtocolVersionList public void removeSessionByChannel(int channelId) { - if (channelId <=0) + if (channelId <= 0) { throw new IllegalArgumentException("Attempt to deregister a session with a channel id <= zero"); } @@ -299,7 +299,7 @@ public class AMQProtocolSession implements ProtocolVersionList { _logger.debug("closeSession called on protocol session for session " + session.getChannelId()); final int channelId = session.getChannelId(); - if (channelId <=0) + if (channelId <= 0) { throw new IllegalArgumentException("Attempt to close a channel with id < 0"); } |