diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client')
14 files changed, 330 insertions, 173 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 6c9fcc0f4c..5c48d73e43 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -268,6 +268,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect //Indicates whether persistent messages are synchronized private boolean _syncPersistence; + //Indicates whether we need to sync on every message ack + private boolean _syncAck; + + //Indicates the sync publish options (persistent|all) + //By default it's async publish + private String _syncPublish = ""; + /** * @param broker brokerdetails * @param username username @@ -348,25 +355,53 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { // set this connection maxPrefetch - if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null) + if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null) { - _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH)); + _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH)); } else { // use the defaul value set for all connections _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, - ClientProperties.MAX_PREFETCH_DEFAULT)); + ClientProperties.MAX_PREFETCH_DEFAULT)); } - if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null) + if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null) { - _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE)); + _syncPersistence = + Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE)); + _logger.warn("sync_persistence is a deprecated property, " + + "please use sync_publish={persistent|all} instead"); } else { // use the defaul value set for all connections _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME); + if (_syncPersistence) + { + _logger.warn("sync_persistence is a deprecated property, " + + "please use sync_publish={persistent|all} instead"); + } + } + + if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null) + { + _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK)); + } + else + { + // use the defaul value set for all connections + _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME); + } + + if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null) + { + _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH); + } + else + { + // use the defaul value set for all connections + _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish); } _failoverPolicy = new FailoverPolicy(connectionURL, this); @@ -1469,6 +1504,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _syncPersistence; } + /** + * Indicates whether we need to sync on every message ack + */ + public boolean getSyncAck() + { + return _syncAck; + } + + public String getSyncPublish() + { + return _syncPublish; + } + public void setIdleTimeout(long l) { _delegate.setIdleTimeout(l); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 29f1aec2f5..c2fb05d94e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -239,12 +239,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { _conn.failoverPrep(); _qpidConnection.resume(); - - if (_conn.firePreResubscribe()) - { - _conn.resubscribeSessions(); - } - _conn.fireFailoverComplete(); return; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java index 08fd49286b..d96544adf8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -22,14 +22,12 @@ package org.apache.qpid.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.AMQException; import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Queue; import javax.jms.QueueBrowser; - import java.util.ArrayList; import java.util.Enumeration; import java.util.concurrent.atomic.AtomicBoolean; @@ -90,38 +88,11 @@ public class AMQQueueBrowser implements QueueBrowser { checkState(); final BasicMessageConsumer consumer = - (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); + (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); _consumers.add(consumer); - return new Enumeration() - { - - Message _nextMessage = consumer == null ? null : consumer.receive(1000); - - public boolean hasMoreElements() - { - _logger.info("QB:hasMoreElements:" + (_nextMessage != null)); - return (_nextMessage != null); - } - - public Object nextElement() - { - Message msg = _nextMessage; - try - { - _logger.info("QB:nextElement about to receive"); - _nextMessage = consumer.receive(1000); - _logger.info("QB:nextElement received:" + _nextMessage); - } - catch (JMSException e) - { - _logger.warn("Exception caught while queue browsing", e); - _nextMessage = null; - } - return msg; - } - }; + return new QueueBrowserEnumeration(consumer); } public void close() throws JMSException @@ -134,4 +105,39 @@ public class AMQQueueBrowser implements QueueBrowser _consumers.clear(); } + private class QueueBrowserEnumeration implements Enumeration + { + Message _nextMessage; + private BasicMessageConsumer _consumer; + + public QueueBrowserEnumeration(BasicMessageConsumer consumer) throws JMSException + { + _nextMessage = consumer == null ? null : consumer.receiveBrowse(); + _logger.info("QB:created with first element:" + _nextMessage); + _consumer = consumer; + } + + public boolean hasMoreElements() + { + _logger.info("QB:hasMoreElements:" + (_nextMessage != null)); + return (_nextMessage != null); + } + + public Object nextElement() + { + Message msg = _nextMessage; + try + { + _logger.info("QB:nextElement about to receive"); + _nextMessage = _consumer.receiveBrowse(); + _logger.info("QB:nextElement received:" + _nextMessage); + } + catch (JMSException e) + { + _logger.warn("Exception caught while queue browsing", e); + _nextMessage = null; + } + return msg; + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 733bee2d81..9012632adf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -575,12 +575,19 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination) throws AMQException { + bindQueue(queueName, routingKey, arguments, exchangeName, destination, false); + } + + public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, + final AMQShortString exchangeName, final AMQDestination destination, + final boolean nowait) throws AMQException + { /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { public Object execute() throws AMQException, FailoverException { - sendQueueBind(queueName, routingKey, arguments, exchangeName, destination); + sendQueueBind(queueName, routingKey, arguments, exchangeName, destination, nowait); return null; } }, _connection).execute(); @@ -595,7 +602,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException; + final AMQShortString exchangeName, AMQDestination destination, + final boolean nowait) throws AMQException, FailoverException; /** * Closes the session. @@ -1815,6 +1823,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic void failoverPrep() { startDispatcherIfNecessary(); + syncDispatchQueue(); + } + + void syncDispatchQueue() + { final CountDownLatch signal = new CountDownLatch(1); _queue.add(new Dispatchable() { public void dispatch(AMQSession ssn) @@ -1828,7 +1841,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (InterruptedException e) { - // pass + throw new RuntimeException(e); } } @@ -1859,6 +1872,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _inRecovery = inRecovery; } + boolean isStarted() + { + return _startedAtLeastOnce.get(); + } + /** * Starts the session, which ensures that it is not suspended and that its event dispatcher is running. * @@ -2281,7 +2299,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * @todo Be aware of possible changes to parameter order as versions change. */ protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal) + final boolean noLocal) throws AMQException + { + return declareQueue(amqd, protocolHandler, noLocal, false); + } + + protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean noLocal, final boolean nowait) throws AMQException { /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ @@ -2296,14 +2320,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic amqd.setQueueName(protocolHandler.generateQueueName()); } - sendQueueDeclare(amqd, protocolHandler); + sendQueueDeclare(amqd, protocolHandler, nowait); return amqd.getAMQQueueName(); } }, _connection).execute(); } - public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException; + public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean nowait) throws AMQException, FailoverException; /** * Undeclares the specified queue. @@ -2416,14 +2441,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic AMQProtocolHandler protocolHandler = getProtocolHandler(); - declareExchange(amqd, protocolHandler, false); + declareExchange(amqd, protocolHandler, nowait); - AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal()); + AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait); // store the consumer queue name consumer.setQueuename(queueName); - bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd); + bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait); // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch if (!_immediatePrefetch) @@ -2455,11 +2480,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector()); - } - catch (JMSException e) // thrown by getMessageSelector - { - throw new AMQException(null, e.getMessage(), e); + consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer._messageSelector); } catch (FailoverException e) { @@ -2531,8 +2552,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic for (C consumer : consumers) { - consumer.failedOver(); + consumer.failedOverPre(); registerConsumer(consumer, true); + consumer.failedOverPost(); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index a8487b04e9..34457d745f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -135,7 +135,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { try { - flushAcknowledgments(); + flushAcknowledgments(true); } catch (Throwable t) { @@ -236,12 +236,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic void flushAcknowledgments() { + flushAcknowledgments(false); + } + + void flushAcknowledgments(boolean setSyncBit) + { synchronized (unacked) { if (unackedCount > 0) { messageAcknowledge - (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); + (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit); clearUnacked(); } } @@ -249,6 +254,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic void messageAcknowledge(RangeSet ranges, boolean accept) { + messageAcknowledge(ranges,accept,false); + } + + void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit) + { Session ssn = getQpidSession(); for (Range range : ranges) { @@ -257,7 +267,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic ssn.flushProcessed(accept ? BATCH : NONE); if (accept) { - ssn.messageAccept(ranges, UNRELIABLE); + ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE); } } @@ -272,7 +282,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * @param arguments 0_8 specific */ public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, - final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination) + final FieldTable arguments, final AMQShortString exchangeName, + final AMQDestination destination, final boolean nowait) throws AMQException, FailoverException { Map args = FiledTableSupport.convertToMap(arguments); @@ -287,9 +298,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString()); getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args); } - // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + if (!nowait) + { + // We need to sync so that we get notify of an error. + getQpidSession().sync(); + getCurrentException(); + } } @@ -501,18 +515,24 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW); } - getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF); + getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, + Option.UNRELIABLE); // We need to sync so that we get notify of an error. // only if not immediat prefetch - if(prefetch() && (consumer.isStrated() || _immediatePrefetch)) + if(prefetch() && (isStarted() || _immediatePrefetch)) { // set the flow getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, - getAMQConnection().getMaxPrefetch()); + getAMQConnection().getMaxPrefetch(), + Option.UNRELIABLE); + } + + if (!nowait) + { + getQpidSession().sync(); + getCurrentException(); } - getQpidSession().sync(); - getCurrentException(); } /** @@ -540,14 +560,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic null, name.toString().startsWith("amq.")? Option.PASSIVE:Option.NONE); // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + if (!nowait) + { + getQpidSession().sync(); + getCurrentException(); + } } /** * Declare a queue with the given queueName */ - public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) + public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean nowait) throws AMQException, FailoverException { // do nothing this is only used by 0_8 @@ -557,7 +581,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * Declare a queue with the given queueName */ public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal) + final boolean noLocal, final boolean nowait) throws AMQException, FailoverException { AMQShortString res; @@ -581,9 +605,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic amqd.isDurable() ? Option.DURABLE : Option.NONE, !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); // passive --> false - // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + if (!nowait) + { + // We need to sync so that we get notify of an error. + getQpidSession().sync(); + getCurrentException(); + } return res; } @@ -609,7 +636,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { for (BasicMessageConsumer consumer : _consumers.values()) { - getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag())); + getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), + Option.UNRELIABLE); } } else @@ -625,17 +653,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (consumer.getMessageListener() != null) { getQpidSession().messageFlow(consumerTag, - MessageCreditUnit.MESSAGE, 1); + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } } else { getQpidSession() .messageFlow(consumerTag, MessageCreditUnit.MESSAGE, - getAMQConnection().getMaxPrefetch()); + getAMQConnection().getMaxPrefetch(), + Option.UNRELIABLE); } getQpidSession() - .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF); + .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, + Option.UNRELIABLE); } catch (Exception e) { @@ -700,6 +731,19 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void opened(Session ssn) {} + public void resumed(Session ssn) + { + _qpidConnection = ssn.getConnection(); + try + { + resubscribe(); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } + public void message(Session ssn, MessageTransfer xfr) { messageReceived(new UnprocessedMessage_0_10(xfr)); @@ -716,7 +760,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void closed(Session ssn) {} protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal) + final boolean noLocal, final boolean nowait) throws AMQException { /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ @@ -736,34 +780,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic amqd.setQueueName(new AMQShortString( binddingKey + "@" + amqd.getExchangeName().toString() + "_" + UUID.randomUUID())); } - return send0_10QueueDeclare(amqd, protocolHandler, noLocal); + return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait); } }, _connection).execute(); } - - void start() throws AMQException - { - super.start(); - for(BasicMessageConsumer c: _consumers.values()) - { - c.start(); - } - } - - - void stop() throws AMQException - { - super.stop(); - for(BasicMessageConsumer c: _consumers.values()) - { - c.stop(); - } - } - - - - public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 6451ae60be..ff8631c12e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -106,7 +106,8 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B } public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName, final AMQDestination dest) throws AMQException, FailoverException + final AMQShortString exchangeName, final AMQDestination dest, + final boolean nowait) throws AMQException, FailoverException { getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody (getTicket(),queueName,exchangeName,routingKey,false,arguments). @@ -300,13 +301,14 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B { ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type, name.toString().startsWith("amq."), - false,false,false,nowait,null); + false,false,false,false,null); AMQFrame exchangeDeclare = body.generateFrame(_channelId); protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } - public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException + public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean nowait) throws AMQException, FailoverException { QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 76422c6297..2bb443a090 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -441,7 +441,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa o = _synchronousQueue.take(); } return o; - } + } + + abstract Message receiveBrowse() throws JMSException; public Message receiveNoWait() throws JMSException { @@ -1037,23 +1039,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa _synchronousQueue.clear(); } - public void start() - { - // do nothing as this is a 0_10 feature - } - - - public void stop() - { - // do nothing as this is a 0_10 feature - } - - public boolean isStrated() - { - // do nothing as this is a 0_10 feature - return false; - } - public AMQShortString getQueuename() { return _queuename; @@ -1070,10 +1055,13 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } /** to be called when a failover has occured */ - public void failedOver() + public void failedOverPre() { clearReceiveQueue(); // TGM FIXME: think this should just be removed // clearUnackedMessages(); } + + public void failedOverPost() {} + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 7d535643c0..8b17dcf91f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -31,6 +31,7 @@ import org.apache.qpid.filter.JMSSelectorFilter; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageListener; import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; @@ -148,7 +149,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (isMessageListenerSet() && ! getSession().prefetch()) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } _logger.debug("messageOk, trying to notify"); super.notifyMessage(jmsMessage); @@ -246,7 +248,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if(! getSession().prefetch()) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } } // now we need to acquire this message if needed @@ -258,9 +261,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _logger.debug("filterMessage - trying to acquire message"); } messageOk = acquireMessage(message); - _logger.debug("filterMessage - *************************************"); _logger.debug("filterMessage - message acquire status : " + messageOk); - _logger.debug("filterMessage - *************************************"); } return messageOk; } @@ -335,7 +336,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (messageListener != null && ! getSession().prefetch()) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } if (messageListener != null && !_synchronousQueue.isEmpty()) { @@ -349,26 +351,16 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } - public boolean isStrated() + public void failedOverPost() { - return _isStarted; - } - - public void start() - { - _isStarted = true; - if (_syncReceive.get()) + if (_0_10session.isStarted() && _syncReceive.get()) { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); + _0_10session.getQpidSession().messageFlow + (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } } - public void stop() - { - _isStarted = false; - } - /** * When messages are not prefetched we need to request a message from the * broker. @@ -380,16 +372,35 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM */ public Object getMessageFromQueue(long l) throws InterruptedException { - if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty()) - { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); - } if (! getSession().prefetch()) { _syncReceive.set(true); } + if (_0_10session.isStarted() && ! getSession().prefetch() && _synchronousQueue.isEmpty()) + { + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + } Object o = super.getMessageFromQueue(l); + if (o == null && _0_10session.isStarted()) + { + _0_10session.getQpidSession().messageFlush + (getConsumerTagString(), Option.UNRELIABLE, Option.SYNC); + _0_10session.getQpidSession().sync(); + _0_10session.getQpidSession().messageFlow + (getConsumerTagString(), MessageCreditUnit.BYTE, + 0xFFFFFFFF, Option.UNRELIABLE); + if (getSession().prefetch()) + { + _0_10session.getQpidSession().messageFlow + (getConsumerTagString(), MessageCreditUnit.MESSAGE, + _0_10session.getAMQConnection().getMaxPrefetch(), + Option.UNRELIABLE); + } + _0_10session.syncDispatchQueue(); + o = super.getMessageFromQueue(-1); + } if (! getSession().prefetch()) { _syncReceive.set(false); @@ -404,6 +415,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { _session.acknowledgeMessage(msg.getDeliveryTag(), false); } + + if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE && + !_session.isInRecovery() && + _session.getAMQConnection().getSyncAck()) + { + ((AMQSession_0_10) getSession()).flushAcknowledgments(); + ((AMQSession_0_10) getSession()).getQpidSession().sync(); + } + } + + Message receiveBrowse() throws JMSException + { + return receiveNoWait(); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 494a8fb43d..308f04f082 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -22,6 +22,7 @@ package org.apache.qpid.client; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; +import javax.jms.Message; import org.apache.qpid.AMQException; import org.apache.qpid.QpidException; @@ -38,9 +39,9 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe protected final Logger _logger = LoggerFactory.getLogger(getClass()); protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination, - String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException + String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, + AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow, + boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException { super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session, protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive, @@ -73,13 +74,18 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe } } - public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception - { + public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception + { return _messageFactory.createMessage(messageFrame.getDeliveryTag(), - messageFrame.isRedelivered(), messageFrame.getExchange(), - messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); + messageFrame.isRedelivered(), messageFrame.getExchange(), + messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); + + } + Message receiveBrowse() throws JMSException + { + return receive(); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 954a3bc28f..5ff6066ddc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -46,6 +46,8 @@ import org.slf4j.LoggerFactory; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { + enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; + protected final Logger _logger = LoggerFactory.getLogger(getClass()); private AMQConnection _connection; @@ -120,6 +122,8 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac protected String _userID; // ref user id used in the connection. private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; + + protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL; protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory, @@ -141,6 +145,26 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac _mandatory = mandatory; _waitUntilSent = waitUntilSent; _userID = connection.getUsername(); + setPublishMode(); + } + + void setPublishMode() + { + // Publish mode could be configured at destination level as well. + // Will add support for this when we provide a more robust binding URL + + String syncPub = _connection.getSyncPublish(); + // Support for deprecated option sync_persistence + if (syncPub.equals("persistent") || _connection.getSyncPersistence()) + { + publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT; + } + else if (syncPub.equals("all")) + { + publishMode = PublishMode.SYNC_PUBLISH_ALL; + } + + _logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode); } void resubscribe() throws AMQException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 4e5077f0cd..b8c5fc8faf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -151,9 +151,13 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer ((AMQSession_0_10) getSession()).getQpidSession(); // if true, we need to sync the delivery of this message - boolean sync = (deliveryMode == DeliveryMode.PERSISTENT && - getSession().getAMQConnection().getSyncPersistence()); + boolean sync = false; + sync = ( (publishMode == PublishMode.SYNC_PUBLISH_ALL) || + (publishMode == PublishMode.SYNC_PUBLISH_PERSISTENT && + deliveryMode == DeliveryMode.PERSISTENT) + ); + org.apache.mina.common.ByteBuffer data = message.getData(); ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java index 986154cda8..3627618e68 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java @@ -47,6 +47,19 @@ public class ClientProperties */ public static final String SYNC_PERSISTENT_PROP_NAME = "sync_persistence"; + /** + * When true a sync command is sent after sending a message ack. + * type: boolean + */ + public static final String SYNC_ACK_PROP_NAME = "sync_ack"; + + /** + * sync_publish property - {persistent|all} + * If set to 'persistent',then persistent messages will be publish synchronously + * If set to 'all', then all messages regardless of the delivery mode will be + * published synchronously. + */ + public static final String SYNC_PUBLISH_PROP_NAME = "sync_publish"; /** * This value will be used in the following settings diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index 39b9597af1..56e9a5dc73 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -25,8 +25,6 @@ import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; import javax.jms.JMSException; import javax.jms.MessageFormatException; @@ -35,8 +33,6 @@ import javax.jms.ObjectMessage; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage { @@ -157,7 +153,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag } finally { - _data.rewind(); + // _data.rewind(); close(in); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java b/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java index fab95f754c..f3f74dd332 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java @@ -58,22 +58,31 @@ public class URLParser if ((connection.getHost() == null) || connection.getHost().equals("")) { - String uid = AMQConnectionFactory.getUniqueClientID(); - if (uid == null) - { - throw URLHelper.parseError(-1, "Client Name not specified", fullURL); + String tmp = connection.getAuthority(); + // hack to read a clientid such as "my_clientID" + if (tmp != null && tmp.indexOf('@') < tmp.length()-1) + { + _url.setClientName(tmp.substring(tmp.indexOf('@')+1,tmp.length())); } else { - _url.setClientName(uid); + String uid = AMQConnectionFactory.getUniqueClientID(); + if (uid == null) + { + throw URLHelper.parseError(-1, "Client Name not specified", fullURL); + } + else + { + _url.setClientName(uid); + } } - } + } else { _url.setClientName(connection.getHost()); } - + String userInfo = connection.getUserInfo(); if (userInfo == null) |