summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/org/apache/qpid/server/AMQChannel.java77
-rw-r--r--java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java24
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java26
-rw-r--r--java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java15
-rw-r--r--java/client/src/org/apache/qpid/client/AMQSession.java1
-rw-r--r--java/client/src/org/apache/qpid/client/BasicMessageConsumer.java6
-rw-r--r--java/client/src/org/apache/qpid/client/BasicMessageProducer.java25
-rw-r--r--java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java12
8 files changed, 113 insertions, 73 deletions
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java
index 8dc4626c46..d4226c42aa 100644
--- a/java/broker/src/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java
@@ -23,26 +23,28 @@ import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.exchange.MessageRouter;
+import org.apache.qpid.server.management.DefaultManagedObject;
+import org.apache.qpid.server.management.Managable;
+import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.TxnBuffer;
import org.apache.qpid.server.txn.TxnOp;
-import org.apache.qpid.server.management.Managable;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.management.DefaultManagedObject;
-import javax.management.ObjectName;
-import javax.management.MalformedObjectNameException;
import javax.management.JMException;
import javax.management.MBeanException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
public class AMQChannel implements Managable
{
@@ -62,7 +64,7 @@ public class AMQChannel implements Managable
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
* value of this represents the <b>last</b> tag sent out
*/
- private long _deliveryTag;
+ private AtomicLong _deliveryTag = new AtomicLong(0);
/**
* A channel has a default queue (the last declared) that is used when no queue name is
@@ -74,7 +76,7 @@ public class AMQChannel implements Managable
* This tag is unique per subscription to a queue. The server returns this in response to a
* basic.consume request.
*/
- private int _consumerTag = 0;
+ private int _consumerTag;
/**
* The current message - which may be partial in the sense that not all frames have been received yet -
@@ -150,7 +152,7 @@ public class AMQChannel implements Managable
_txnBuffer.commit();
}
}
- catch(AMQException ex)
+ catch (AMQException ex)
{
throw new MBeanException(ex, ex.toString());
}
@@ -160,13 +162,13 @@ public class AMQChannel implements Managable
{
if (_transactional)
{
- synchronized (_txnBuffer)
+ synchronized(_txnBuffer)
{
try
{
_txnBuffer.rollback();
}
- catch(AMQException ex)
+ catch (AMQException ex)
{
throw new MBeanException(ex, ex.toString());
}
@@ -201,7 +203,7 @@ public class AMQChannel implements Managable
}
public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
- throws AMQException
+ throws AMQException
{
_channelId = channelId;
_channelName = _channelId + "-" + this.hashCode();
@@ -300,7 +302,7 @@ public class AMQChannel implements Managable
public long getNextDeliveryTag()
{
- return ++_deliveryTag;
+ return _deliveryTag.incrementAndGet();
}
public int getNextConsumerTag()
@@ -348,7 +350,7 @@ public class AMQChannel implements Managable
else
{
throw new AMQException(_log, "Consumer tag " + consumerTag + " not known to channel " +
- _channelId);
+ _channelId);
}
}
@@ -361,7 +363,7 @@ public class AMQChannel implements Managable
{
if (_transactional)
{
- synchronized (_txnBuffer)
+ synchronized(_txnBuffer)
{
_txnBuffer.rollback();//releases messages
}
@@ -390,7 +392,7 @@ public class AMQChannel implements Managable
*/
public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue)
{
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
_unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag));
checkSuspension();
@@ -405,7 +407,7 @@ public class AMQChannel implements Managable
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Map<Long, UnacknowledgedMessage> currentList;
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
currentList = _unacknowledgedMessageMap;
_unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
@@ -426,7 +428,7 @@ public class AMQChannel implements Managable
public void resend(AMQProtocolSession session)
{
//messages go to this channel
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet())
{
@@ -449,7 +451,7 @@ public class AMQChannel implements Managable
*/
public void queueDeleted(AMQQueue queue)
{
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
for (Map.Entry<Long, UnacknowledgedMessage> unacked : _unacknowledgedMessageMap.entrySet())
{
@@ -465,13 +467,25 @@ public class AMQChannel implements Managable
catch (AMQException e)
{
_log.error("Error decrementing ref count on message " + unackedMsg.message.getMessageId() + ": " +
- e, e);
+ e, e);
}
}
}
}
}
+ public synchronized long prepareNewMessageForDelivery(boolean acks, AMQMessage msg, String consumerTag, AMQQueue queue)
+ {
+ long deliveryTag = getNextDeliveryTag();
+
+ if (acks)
+ {
+ addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+
+ return deliveryTag;
+ }
+
/**
* Acknowledge one or more messages.
*
@@ -498,7 +512,7 @@ public class AMQChannel implements Managable
if (multiple)
{
LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
if (deliveryTag == 0)
{
@@ -514,10 +528,20 @@ public class AMQChannel implements Managable
throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
}
Iterator<Map.Entry<Long, UnacknowledgedMessage>> i = _unacknowledgedMessageMap.entrySet().iterator();
+
while (i.hasNext())
{
+
Map.Entry<Long, UnacknowledgedMessage> unacked = i.next();
+
+ if (unacked.getKey() > deliveryTag)
+ {
+ //This should not occur now.
+ throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + " When deliveryTag is:" + deliveryTag + "ES:" + _unacknowledgedMessageMap.entrySet().toString());
+ }
+
i.remove();
+
acked.add(unacked.getValue());
if (unacked.getKey() == deliveryTag)
{
@@ -525,11 +549,12 @@ public class AMQChannel implements Managable
}
}
}
- }
+ }// synchronized
+
if (_log.isDebugEnabled())
{
_log.debug("Received multiple ack for delivery tag " + deliveryTag + ". Removing " +
- acked.size() + " items.");
+ acked.size() + " items.");
}
for (UnacknowledgedMessage msg : acked)
@@ -541,12 +566,14 @@ public class AMQChannel implements Managable
else
{
UnacknowledgedMessage msg;
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
msg = _unacknowledgedMessageMap.remove(deliveryTag);
}
+
if (msg == null)
{
+ _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
}
msg.discard();
@@ -573,7 +600,7 @@ public class AMQChannel implements Managable
{
boolean suspend;
//noinspection SynchronizeOnNonFinalField
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
suspend = _unacknowledgedMessageMap.size() >= _prefetchCount;
}
@@ -614,7 +641,7 @@ public class AMQChannel implements Managable
public void rollback() throws AMQException
{
//need to protect rollback and close from each other...
- synchronized (_txnBuffer)
+ synchronized(_txnBuffer)
{
_txnBuffer.rollback();
}
diff --git a/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
index 7f1c7df224..a703595cc4 100644
--- a/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -17,19 +17,21 @@
*/
package org.apache.qpid.server.exchange;
-import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.log4j.Logger;
-import javax.management.openmbean.*;
-import javax.management.MBeanException;
import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.openmbean.*;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.ArrayList;
public class DestNameExchange extends AbstractExchange
{
@@ -117,12 +119,14 @@ public class DestNameExchange extends AbstractExchange
}
public void createBinding(String queueName, String binding)
- throws JMException
+ throws JMException
{
AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName);
if (queue == null)
+ {
throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
+ }
try
{
@@ -147,7 +151,7 @@ public class DestNameExchange extends AbstractExchange
{
assert queue != null;
assert routingKey != null;
- if(!_index.add(routingKey, queue))
+ if (!_index.add(routingKey, queue))
{
_logger.debug("Queue " + queue + " is already registered with routing key " + routingKey);
}
@@ -195,7 +199,7 @@ public class DestNameExchange extends AbstractExchange
_logger.debug("Publishing message to queue " + queues);
}
- for(AMQQueue q :queues)
+ for (AMQQueue q : queues)
{
q.deliver(payload);
}
diff --git a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
index a3c2fab4f4..ef18f61070 100644
--- a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -19,12 +19,12 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.AMQException;
/**
* Encapsulation of a supscription to a queue.
@@ -70,7 +70,8 @@ public class SubscriptionImpl implements Subscription
throws AMQException
{
AMQChannel channel = protocolSession.getChannel(channelId);
- if (channel == null) {
+ if (channel == null)
+ {
throw new NullPointerException("channel not found in protocol session");
}
@@ -99,8 +100,8 @@ public class SubscriptionImpl implements Subscription
private boolean equals(SubscriptionImpl psc)
{
return sessionKey.equals(psc.sessionKey)
- && psc.channel == channel
- && psc.consumerTag.equals(consumerTag);
+ && psc.channel == channel
+ && psc.consumerTag.equals(consumerTag);
}
public int hashCode()
@@ -113,18 +114,25 @@ public class SubscriptionImpl implements Subscription
return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]";
}
+ /**
+ * This method can be called by each of the publisher threads.
+ * As a result all changes to the channel object must be thread safe.
+ *
+ * @param msg
+ * @param queue
+ * @throws AMQException
+ */
public void send(AMQMessage msg, AMQQueue queue) throws AMQException
{
if (msg != null)
{
- final long deliveryTag = channel.getNextDeliveryTag();
+ long deliveryTag = channel.prepareNewMessageForDelivery(_acks,msg,consumerTag,queue);
+
ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
- if (_acks)
- {
- channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
- }
+
protocolSession.writeFrame(frame);
+
// if we do not need to wait for client acknowledgements we can decrement
// the reference count immediately
if (!_acks)
diff --git a/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java
index baa414ff19..8dd268e673 100644
--- a/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -17,21 +17,20 @@
*/
package org.apache.qpid.server.store;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.AMQException;
-import org.apache.log4j.Logger;
-import org.apache.commons.configuration.Configuration;
-import java.util.concurrent.ConcurrentMap;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.List;
/**
* A simple message store that stores the messages in a threadsafe structure in memory.
- *
*/
public class MemoryMessageStore implements MessageStore
{
@@ -48,7 +47,7 @@ public class MemoryMessageStore implements MessageStore
public void configure()
{
_log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash table");
- _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY);
+ _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY);
}
public void configure(String base, Configuration config)
@@ -65,7 +64,7 @@ public class MemoryMessageStore implements MessageStore
public void close() throws Exception
{
- if(_messageMap != null)
+ if (_messageMap != null)
{
_messageMap.clear();
_messageMap = null;
diff --git a/java/client/src/org/apache/qpid/client/AMQSession.java b/java/client/src/org/apache/qpid/client/AMQSession.java
index 2c59e5f809..4768399036 100644
--- a/java/client/src/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/org/apache/qpid/client/AMQSession.java
@@ -402,6 +402,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
//need to send ack for messages delivered to consumers so far
for(Iterator i = _consumers.values().iterator(); i.hasNext();)
{
+ //Sends acknowledgement to server
((BasicMessageConsumer) i.next()).acknowledgeLastDelivered();
}
diff --git a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
index 5d13a1cd41..b46c5f111d 100644
--- a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
@@ -382,9 +382,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliverBody.deliveryTag,
- messageFrame.deliverBody.redelivered,
- messageFrame.contentHeader,
- messageFrame.bodies);
+ messageFrame.deliverBody.redelivered,
+ messageFrame.contentHeader,
+ messageFrame.bodies);
_logger.debug("Message is of type: " + jmsMessage.getClass().getName());
diff --git a/java/client/src/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/org/apache/qpid/client/BasicMessageProducer.java
index a6bc7a0781..694a4a7863 100644
--- a/java/client/src/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/org/apache/qpid/client/BasicMessageProducer.java
@@ -17,13 +17,13 @@
*/
package org.apache.qpid.client;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -122,10 +122,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
void resubscribe() throws AMQException
{
- if (_destination != null)
- {
- declareDestination(_destination);
- }
+ if (_destination != null)
+ {
+ declareDestination(_destination);
+ }
}
private void declareDestination(AMQDestination destination)
@@ -330,16 +330,17 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
if (!(destination instanceof AMQDestination))
{
throw new JMSException("Unsupported destination class: " +
- (destination != null?destination.getClass():null));
+ (destination != null ? destination.getClass() : null));
}
declareDestination((AMQDestination)destination);
}
protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
- long timeToLive, boolean mandatory, boolean immediate) throws JMSException
+ long timeToLive, boolean mandatory, boolean immediate) throws JMSException
{
sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
}
+
/**
* The caller of this method must hold the failover mutex.
* @param destination
@@ -352,7 +353,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
* @throws JMSException
*/
protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
- long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
+ long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
{
AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
destination.getRoutingKey(), mandatory, immediate);
@@ -366,10 +367,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
//
// Very nasty temporary hack for GRM-206. Will be altered ASAP.
//
- if(message instanceof JMSBytesMessage)
+ if (message instanceof JMSBytesMessage)
{
JMSBytesMessage msg = (JMSBytesMessage) message;
- if(!msg.isReadable())
+ if (!msg.isReadable())
{
msg.reset();
}
@@ -442,7 +443,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
int dataLength = payload.remaining();
final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0;
- int frameCount = (int) (dataLength/framePayloadMax) + lastFrame;
+ int frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
final ContentBody[] bodies = new ContentBody[frameCount];
if (frameCount == 1)
diff --git a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 77685a0222..b181490fdd 100644
--- a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -230,7 +230,7 @@ public class AMQProtocolSession implements ProtocolVersionList
}
if (msg.isAllBodyDataReceived())
{
- deliverMessageToAMQSession(channelId, msg);
+ deliverMessageToAMQSession(channelId, msg);
}
}
@@ -260,8 +260,8 @@ public class AMQProtocolSession implements ProtocolVersionList
public void writeFrame(AMQDataBlock frame, boolean wait)
{
- WriteFuture f =_minaProtocolSession.write(frame);
- if(wait)
+ WriteFuture f = _minaProtocolSession.write(frame);
+ if (wait)
{
f.join();
}
@@ -269,7 +269,7 @@ public class AMQProtocolSession implements ProtocolVersionList
public void addSessionByChannel(int channelId, AMQSession session)
{
- if (channelId <=0)
+ if (channelId <= 0)
{
throw new IllegalArgumentException("Attempt to register a session with a channel id <= zero");
}
@@ -283,7 +283,7 @@ public class AMQProtocolSession implements ProtocolVersionList
public void removeSessionByChannel(int channelId)
{
- if (channelId <=0)
+ if (channelId <= 0)
{
throw new IllegalArgumentException("Attempt to deregister a session with a channel id <= zero");
}
@@ -299,7 +299,7 @@ public class AMQProtocolSession implements ProtocolVersionList
{
_logger.debug("closeSession called on protocol session for session " + session.getChannelId());
final int channelId = session.getChannelId();
- if (channelId <=0)
+ if (channelId <= 0)
{
throw new IllegalArgumentException("Attempt to close a channel with id < 0");
}