diff options
8 files changed, 113 insertions, 73 deletions
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java index 8dc4626c46..d4226c42aa 100644 --- a/java/broker/src/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java @@ -23,26 +23,28 @@ import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.exchange.MessageRouter; +import org.apache.qpid.server.management.DefaultManagedObject; +import org.apache.qpid.server.management.Managable; +import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.TxnBuffer; import org.apache.qpid.server.txn.TxnOp; -import org.apache.qpid.server.management.Managable; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.management.DefaultManagedObject; -import javax.management.ObjectName; -import javax.management.MalformedObjectNameException; import javax.management.JMException; import javax.management.MBeanException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + public class AMQChannel implements Managable { @@ -62,7 +64,7 @@ public class AMQChannel implements Managable * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that * value of this represents the <b>last</b> tag sent out */ - private long _deliveryTag; + private AtomicLong _deliveryTag = new AtomicLong(0); /** * A channel has a default queue (the last declared) that is used when no queue name is @@ -74,7 +76,7 @@ public class AMQChannel implements Managable * This tag is unique per subscription to a queue. The server returns this in response to a * basic.consume request. */ - private int _consumerTag = 0; + private int _consumerTag; /** * The current message - which may be partial in the sense that not all frames have been received yet - @@ -150,7 +152,7 @@ public class AMQChannel implements Managable _txnBuffer.commit(); } } - catch(AMQException ex) + catch (AMQException ex) { throw new MBeanException(ex, ex.toString()); } @@ -160,13 +162,13 @@ public class AMQChannel implements Managable { if (_transactional) { - synchronized (_txnBuffer) + synchronized(_txnBuffer) { try { _txnBuffer.rollback(); } - catch(AMQException ex) + catch (AMQException ex) { throw new MBeanException(ex, ex.toString()); } @@ -201,7 +203,7 @@ public class AMQChannel implements Managable } public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges) - throws AMQException + throws AMQException { _channelId = channelId; _channelName = _channelId + "-" + this.hashCode(); @@ -300,7 +302,7 @@ public class AMQChannel implements Managable public long getNextDeliveryTag() { - return ++_deliveryTag; + return _deliveryTag.incrementAndGet(); } public int getNextConsumerTag() @@ -348,7 +350,7 @@ public class AMQChannel implements Managable else { throw new AMQException(_log, "Consumer tag " + consumerTag + " not known to channel " + - _channelId); + _channelId); } } @@ -361,7 +363,7 @@ public class AMQChannel implements Managable { if (_transactional) { - synchronized (_txnBuffer) + synchronized(_txnBuffer) { _txnBuffer.rollback();//releases messages } @@ -390,7 +392,7 @@ public class AMQChannel implements Managable */ public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue) { - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag)); checkSuspension(); @@ -405,7 +407,7 @@ public class AMQChannel implements Managable { // we must create a new map since all the messages will get a new delivery tag when they are redelivered Map<Long, UnacknowledgedMessage> currentList; - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { currentList = _unacknowledgedMessageMap; _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH); @@ -426,7 +428,7 @@ public class AMQChannel implements Managable public void resend(AMQProtocolSession session) { //messages go to this channel - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet()) { @@ -449,7 +451,7 @@ public class AMQChannel implements Managable */ public void queueDeleted(AMQQueue queue) { - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { for (Map.Entry<Long, UnacknowledgedMessage> unacked : _unacknowledgedMessageMap.entrySet()) { @@ -465,13 +467,25 @@ public class AMQChannel implements Managable catch (AMQException e) { _log.error("Error decrementing ref count on message " + unackedMsg.message.getMessageId() + ": " + - e, e); + e, e); } } } } } + public synchronized long prepareNewMessageForDelivery(boolean acks, AMQMessage msg, String consumerTag, AMQQueue queue) + { + long deliveryTag = getNextDeliveryTag(); + + if (acks) + { + addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + } + + return deliveryTag; + } + /** * Acknowledge one or more messages. * @@ -498,7 +512,7 @@ public class AMQChannel implements Managable if (multiple) { LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>(); - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { if (deliveryTag == 0) { @@ -514,10 +528,20 @@ public class AMQChannel implements Managable throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel"); } Iterator<Map.Entry<Long, UnacknowledgedMessage>> i = _unacknowledgedMessageMap.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry<Long, UnacknowledgedMessage> unacked = i.next(); + + if (unacked.getKey() > deliveryTag) + { + //This should not occur now. + throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + " When deliveryTag is:" + deliveryTag + "ES:" + _unacknowledgedMessageMap.entrySet().toString()); + } + i.remove(); + acked.add(unacked.getValue()); if (unacked.getKey() == deliveryTag) { @@ -525,11 +549,12 @@ public class AMQChannel implements Managable } } } - } + }// synchronized + if (_log.isDebugEnabled()) { _log.debug("Received multiple ack for delivery tag " + deliveryTag + ". Removing " + - acked.size() + " items."); + acked.size() + " items."); } for (UnacknowledgedMessage msg : acked) @@ -541,12 +566,14 @@ public class AMQChannel implements Managable else { UnacknowledgedMessage msg; - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { msg = _unacknowledgedMessageMap.remove(deliveryTag); } + if (msg == null) { + _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); } msg.discard(); @@ -573,7 +600,7 @@ public class AMQChannel implements Managable { boolean suspend; //noinspection SynchronizeOnNonFinalField - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { suspend = _unacknowledgedMessageMap.size() >= _prefetchCount; } @@ -614,7 +641,7 @@ public class AMQChannel implements Managable public void rollback() throws AMQException { //need to protect rollback and close from each other... - synchronized (_txnBuffer) + synchronized(_txnBuffer) { _txnBuffer.rollback(); } diff --git a/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java index 7f1c7df224..a703595cc4 100644 --- a/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java @@ -17,19 +17,21 @@ */ package org.apache.qpid.server.exchange; -import org.apache.qpid.server.queue.AMQQueue; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.log4j.Logger; -import javax.management.openmbean.*; -import javax.management.MBeanException; import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.openmbean.*; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.ArrayList; public class DestNameExchange extends AbstractExchange { @@ -117,12 +119,14 @@ public class DestNameExchange extends AbstractExchange } public void createBinding(String queueName, String binding) - throws JMException + throws JMException { AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName); if (queue == null) + { throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); + } try { @@ -147,7 +151,7 @@ public class DestNameExchange extends AbstractExchange { assert queue != null; assert routingKey != null; - if(!_index.add(routingKey, queue)) + if (!_index.add(routingKey, queue)) { _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey); } @@ -195,7 +199,7 @@ public class DestNameExchange extends AbstractExchange _logger.debug("Publishing message to queue " + queues); } - for(AMQQueue q :queues) + for (AMQQueue q : queues) { q.deliver(payload); } diff --git a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java index a3c2fab4f4..ef18f61070 100644 --- a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -19,12 +19,12 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.BasicDeliverBody; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.AMQException; /** * Encapsulation of a supscription to a queue. @@ -70,7 +70,8 @@ public class SubscriptionImpl implements Subscription throws AMQException { AMQChannel channel = protocolSession.getChannel(channelId); - if (channel == null) { + if (channel == null) + { throw new NullPointerException("channel not found in protocol session"); } @@ -99,8 +100,8 @@ public class SubscriptionImpl implements Subscription private boolean equals(SubscriptionImpl psc) { return sessionKey.equals(psc.sessionKey) - && psc.channel == channel - && psc.consumerTag.equals(consumerTag); + && psc.channel == channel + && psc.consumerTag.equals(consumerTag); } public int hashCode() @@ -113,18 +114,25 @@ public class SubscriptionImpl implements Subscription return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]"; } + /** + * This method can be called by each of the publisher threads. + * As a result all changes to the channel object must be thread safe. + * + * @param msg + * @param queue + * @throws AMQException + */ public void send(AMQMessage msg, AMQQueue queue) throws AMQException { if (msg != null) { - final long deliveryTag = channel.getNextDeliveryTag(); + long deliveryTag = channel.prepareNewMessageForDelivery(_acks,msg,consumerTag,queue); + ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); - if (_acks) - { - channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); - } + protocolSession.writeFrame(frame); + // if we do not need to wait for client acknowledgements we can decrement // the reference count immediately if (!_acks) diff --git a/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java index baa414ff19..8dd268e673 100644 --- a/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java @@ -17,21 +17,20 @@ */ package org.apache.qpid.server.store; +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.AMQException; -import org.apache.log4j.Logger; -import org.apache.commons.configuration.Configuration; -import java.util.concurrent.ConcurrentMap; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.List; /** * A simple message store that stores the messages in a threadsafe structure in memory. - * */ public class MemoryMessageStore implements MessageStore { @@ -48,7 +47,7 @@ public class MemoryMessageStore implements MessageStore public void configure() { _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash table"); - _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY); + _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY); } public void configure(String base, Configuration config) @@ -65,7 +64,7 @@ public class MemoryMessageStore implements MessageStore public void close() throws Exception { - if(_messageMap != null) + if (_messageMap != null) { _messageMap.clear(); _messageMap = null; diff --git a/java/client/src/org/apache/qpid/client/AMQSession.java b/java/client/src/org/apache/qpid/client/AMQSession.java index 2c59e5f809..4768399036 100644 --- a/java/client/src/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/org/apache/qpid/client/AMQSession.java @@ -402,6 +402,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi //need to send ack for messages delivered to consumers so far for(Iterator i = _consumers.values().iterator(); i.hasNext();) { + //Sends acknowledgement to server ((BasicMessageConsumer) i.next()).acknowledgeLastDelivered(); } diff --git a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java index 5d13a1cd41..b46c5f111d 100644 --- a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java @@ -382,9 +382,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliverBody.deliveryTag, - messageFrame.deliverBody.redelivered, - messageFrame.contentHeader, - messageFrame.bodies); + messageFrame.deliverBody.redelivered, + messageFrame.contentHeader, + messageFrame.bodies); _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); diff --git a/java/client/src/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/org/apache/qpid/client/BasicMessageProducer.java index a6bc7a0781..694a4a7863 100644 --- a/java/client/src/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/org/apache/qpid/client/BasicMessageProducer.java @@ -17,13 +17,13 @@ */ package org.apache.qpid.client; +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSBytesMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.*; -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -122,10 +122,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j void resubscribe() throws AMQException { - if (_destination != null) - { - declareDestination(_destination); - } + if (_destination != null) + { + declareDestination(_destination); + } } private void declareDestination(AMQDestination destination) @@ -330,16 +330,17 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j if (!(destination instanceof AMQDestination)) { throw new JMSException("Unsupported destination class: " + - (destination != null?destination.getClass():null)); + (destination != null ? destination.getClass() : null)); } declareDestination((AMQDestination)destination); } protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority, - long timeToLive, boolean mandatory, boolean immediate) throws JMSException + long timeToLive, boolean mandatory, boolean immediate) throws JMSException { sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent); } + /** * The caller of this method must hold the failover mutex. * @param destination @@ -352,7 +353,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j * @throws JMSException */ protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority, - long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException + long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException { AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(), destination.getRoutingKey(), mandatory, immediate); @@ -366,10 +367,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j // // Very nasty temporary hack for GRM-206. Will be altered ASAP. // - if(message instanceof JMSBytesMessage) + if (message instanceof JMSBytesMessage) { JMSBytesMessage msg = (JMSBytesMessage) message; - if(!msg.isReadable()) + if (!msg.isReadable()) { msg.reset(); } @@ -442,7 +443,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j int dataLength = payload.remaining(); final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0; - int frameCount = (int) (dataLength/framePayloadMax) + lastFrame; + int frameCount = (int) (dataLength / framePayloadMax) + lastFrame; final ContentBody[] bodies = new ContentBody[frameCount]; if (frameCount == 1) diff --git a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java index 77685a0222..b181490fdd 100644 --- a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -230,7 +230,7 @@ public class AMQProtocolSession implements ProtocolVersionList } if (msg.isAllBodyDataReceived()) { - deliverMessageToAMQSession(channelId, msg); + deliverMessageToAMQSession(channelId, msg); } } @@ -260,8 +260,8 @@ public class AMQProtocolSession implements ProtocolVersionList public void writeFrame(AMQDataBlock frame, boolean wait) { - WriteFuture f =_minaProtocolSession.write(frame); - if(wait) + WriteFuture f = _minaProtocolSession.write(frame); + if (wait) { f.join(); } @@ -269,7 +269,7 @@ public class AMQProtocolSession implements ProtocolVersionList public void addSessionByChannel(int channelId, AMQSession session) { - if (channelId <=0) + if (channelId <= 0) { throw new IllegalArgumentException("Attempt to register a session with a channel id <= zero"); } @@ -283,7 +283,7 @@ public class AMQProtocolSession implements ProtocolVersionList public void removeSessionByChannel(int channelId) { - if (channelId <=0) + if (channelId <= 0) { throw new IllegalArgumentException("Attempt to deregister a session with a channel id <= zero"); } @@ -299,7 +299,7 @@ public class AMQProtocolSession implements ProtocolVersionList { _logger.debug("closeSession called on protocol session for session " + session.getChannelId()); final int channelId = session.getChannelId(); - if (channelId <=0) + if (channelId <= 0) { throw new IllegalArgumentException("Attempt to close a channel with id < 0"); } |