diff options
author | Stephen D. Huston <shuston@apache.org> | 2009-03-10 23:10:57 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2009-03-10 23:10:57 +0000 |
commit | a3047f65e740f287b2ced28ed6f916f18d15e7e3 (patch) | |
tree | ce18a6fe93e31ff308dd41348e879ccb594b92a6 /qpid/java/client | |
parent | ef83bd90573df8a376cbaef4b01b1b4300ffe1ce (diff) | |
download | qpid-python-a3047f65e740f287b2ced28ed6f916f18d15e7e3.tar.gz |
Merge in trunk to r752275
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-1673@752300 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
24 files changed, 512 insertions, 195 deletions
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java index 93bb097268..1ac3e85f7a 100755 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java @@ -42,6 +42,8 @@ public class Listener implements SessionListener public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(Session ssn, MessageTransfer xfr) { System.out.println("Message: " + xfr); diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java index 4c72ce75a5..21f9c43cd2 100755 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java @@ -42,6 +42,8 @@ public class Listener implements SessionListener public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(Session ssn, MessageTransfer xfr) { System.out.println("Message: " + xfr); diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java index aa1c9b0a41..dff49228a1 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java @@ -32,6 +32,8 @@ public class Listener implements SessionListener public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(Session ssn, MessageTransfer xfr) { String body = xfr.getBodyString(); diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java index b930062813..e17d3eef9f 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java @@ -44,6 +44,8 @@ public class Listener implements SessionListener public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(Session ssn, MessageTransfer xfr) { String body = xfr.getBodyString(); diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java index 5e6d3c6f69..dd9307ca84 100755 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java @@ -40,6 +40,8 @@ public class TopicListener implements SessionListener public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(Session ssn, MessageTransfer xfr) { DeliveryProperties dp = xfr.getHeader().get(DeliveryProperties.class); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 6c9fcc0f4c..5c48d73e43 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -268,6 +268,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect //Indicates whether persistent messages are synchronized private boolean _syncPersistence; + //Indicates whether we need to sync on every message ack + private boolean _syncAck; + + //Indicates the sync publish options (persistent|all) + //By default it's async publish + private String _syncPublish = ""; + /** * @param broker brokerdetails * @param username username @@ -348,25 +355,53 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { // set this connection maxPrefetch - if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null) + if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null) { - _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH)); + _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH)); } else { // use the defaul value set for all connections _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, - ClientProperties.MAX_PREFETCH_DEFAULT)); + ClientProperties.MAX_PREFETCH_DEFAULT)); } - if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null) + if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null) { - _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE)); + _syncPersistence = + Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE)); + _logger.warn("sync_persistence is a deprecated property, " + + "please use sync_publish={persistent|all} instead"); } else { // use the defaul value set for all connections _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME); + if (_syncPersistence) + { + _logger.warn("sync_persistence is a deprecated property, " + + "please use sync_publish={persistent|all} instead"); + } + } + + if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null) + { + _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK)); + } + else + { + // use the defaul value set for all connections + _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME); + } + + if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null) + { + _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH); + } + else + { + // use the defaul value set for all connections + _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish); } _failoverPolicy = new FailoverPolicy(connectionURL, this); @@ -1469,6 +1504,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _syncPersistence; } + /** + * Indicates whether we need to sync on every message ack + */ + public boolean getSyncAck() + { + return _syncAck; + } + + public String getSyncPublish() + { + return _syncPublish; + } + public void setIdleTimeout(long l) { _delegate.setIdleTimeout(l); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 29f1aec2f5..c2fb05d94e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -239,12 +239,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { _conn.failoverPrep(); _qpidConnection.resume(); - - if (_conn.firePreResubscribe()) - { - _conn.resubscribeSessions(); - } - _conn.fireFailoverComplete(); return; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java index 08fd49286b..d96544adf8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -22,14 +22,12 @@ package org.apache.qpid.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.AMQException; import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Queue; import javax.jms.QueueBrowser; - import java.util.ArrayList; import java.util.Enumeration; import java.util.concurrent.atomic.AtomicBoolean; @@ -90,38 +88,11 @@ public class AMQQueueBrowser implements QueueBrowser { checkState(); final BasicMessageConsumer consumer = - (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); + (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); _consumers.add(consumer); - return new Enumeration() - { - - Message _nextMessage = consumer == null ? null : consumer.receive(1000); - - public boolean hasMoreElements() - { - _logger.info("QB:hasMoreElements:" + (_nextMessage != null)); - return (_nextMessage != null); - } - - public Object nextElement() - { - Message msg = _nextMessage; - try - { - _logger.info("QB:nextElement about to receive"); - _nextMessage = consumer.receive(1000); - _logger.info("QB:nextElement received:" + _nextMessage); - } - catch (JMSException e) - { - _logger.warn("Exception caught while queue browsing", e); - _nextMessage = null; - } - return msg; - } - }; + return new QueueBrowserEnumeration(consumer); } public void close() throws JMSException @@ -134,4 +105,39 @@ public class AMQQueueBrowser implements QueueBrowser _consumers.clear(); } + private class QueueBrowserEnumeration implements Enumeration + { + Message _nextMessage; + private BasicMessageConsumer _consumer; + + public QueueBrowserEnumeration(BasicMessageConsumer consumer) throws JMSException + { + _nextMessage = consumer == null ? null : consumer.receiveBrowse(); + _logger.info("QB:created with first element:" + _nextMessage); + _consumer = consumer; + } + + public boolean hasMoreElements() + { + _logger.info("QB:hasMoreElements:" + (_nextMessage != null)); + return (_nextMessage != null); + } + + public Object nextElement() + { + Message msg = _nextMessage; + try + { + _logger.info("QB:nextElement about to receive"); + _nextMessage = _consumer.receiveBrowse(); + _logger.info("QB:nextElement received:" + _nextMessage); + } + catch (JMSException e) + { + _logger.warn("Exception caught while queue browsing", e); + _nextMessage = null; + } + return msg; + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 733bee2d81..9012632adf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -575,12 +575,19 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination) throws AMQException { + bindQueue(queueName, routingKey, arguments, exchangeName, destination, false); + } + + public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, + final AMQShortString exchangeName, final AMQDestination destination, + final boolean nowait) throws AMQException + { /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { public Object execute() throws AMQException, FailoverException { - sendQueueBind(queueName, routingKey, arguments, exchangeName, destination); + sendQueueBind(queueName, routingKey, arguments, exchangeName, destination, nowait); return null; } }, _connection).execute(); @@ -595,7 +602,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException; + final AMQShortString exchangeName, AMQDestination destination, + final boolean nowait) throws AMQException, FailoverException; /** * Closes the session. @@ -1815,6 +1823,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic void failoverPrep() { startDispatcherIfNecessary(); + syncDispatchQueue(); + } + + void syncDispatchQueue() + { final CountDownLatch signal = new CountDownLatch(1); _queue.add(new Dispatchable() { public void dispatch(AMQSession ssn) @@ -1828,7 +1841,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (InterruptedException e) { - // pass + throw new RuntimeException(e); } } @@ -1859,6 +1872,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _inRecovery = inRecovery; } + boolean isStarted() + { + return _startedAtLeastOnce.get(); + } + /** * Starts the session, which ensures that it is not suspended and that its event dispatcher is running. * @@ -2281,7 +2299,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * @todo Be aware of possible changes to parameter order as versions change. */ protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal) + final boolean noLocal) throws AMQException + { + return declareQueue(amqd, protocolHandler, noLocal, false); + } + + protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean noLocal, final boolean nowait) throws AMQException { /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ @@ -2296,14 +2320,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic amqd.setQueueName(protocolHandler.generateQueueName()); } - sendQueueDeclare(amqd, protocolHandler); + sendQueueDeclare(amqd, protocolHandler, nowait); return amqd.getAMQQueueName(); } }, _connection).execute(); } - public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException; + public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean nowait) throws AMQException, FailoverException; /** * Undeclares the specified queue. @@ -2416,14 +2441,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic AMQProtocolHandler protocolHandler = getProtocolHandler(); - declareExchange(amqd, protocolHandler, false); + declareExchange(amqd, protocolHandler, nowait); - AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal()); + AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait); // store the consumer queue name consumer.setQueuename(queueName); - bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd); + bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait); // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch if (!_immediatePrefetch) @@ -2455,11 +2480,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector()); - } - catch (JMSException e) // thrown by getMessageSelector - { - throw new AMQException(null, e.getMessage(), e); + consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer._messageSelector); } catch (FailoverException e) { @@ -2531,8 +2552,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic for (C consumer : consumers) { - consumer.failedOver(); + consumer.failedOverPre(); registerConsumer(consumer, true); + consumer.failedOverPost(); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index a8487b04e9..34457d745f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -135,7 +135,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { try { - flushAcknowledgments(); + flushAcknowledgments(true); } catch (Throwable t) { @@ -236,12 +236,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic void flushAcknowledgments() { + flushAcknowledgments(false); + } + + void flushAcknowledgments(boolean setSyncBit) + { synchronized (unacked) { if (unackedCount > 0) { messageAcknowledge - (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); + (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit); clearUnacked(); } } @@ -249,6 +254,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic void messageAcknowledge(RangeSet ranges, boolean accept) { + messageAcknowledge(ranges,accept,false); + } + + void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit) + { Session ssn = getQpidSession(); for (Range range : ranges) { @@ -257,7 +267,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic ssn.flushProcessed(accept ? BATCH : NONE); if (accept) { - ssn.messageAccept(ranges, UNRELIABLE); + ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE); } } @@ -272,7 +282,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * @param arguments 0_8 specific */ public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, - final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination) + final FieldTable arguments, final AMQShortString exchangeName, + final AMQDestination destination, final boolean nowait) throws AMQException, FailoverException { Map args = FiledTableSupport.convertToMap(arguments); @@ -287,9 +298,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString()); getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args); } - // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + if (!nowait) + { + // We need to sync so that we get notify of an error. + getQpidSession().sync(); + getCurrentException(); + } } @@ -501,18 +515,24 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW); } - getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF); + getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, + Option.UNRELIABLE); // We need to sync so that we get notify of an error. // only if not immediat prefetch - if(prefetch() && (consumer.isStrated() || _immediatePrefetch)) + if(prefetch() && (isStarted() || _immediatePrefetch)) { // set the flow getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, - getAMQConnection().getMaxPrefetch()); + getAMQConnection().getMaxPrefetch(), + Option.UNRELIABLE); + } + + if (!nowait) + { + getQpidSession().sync(); + getCurrentException(); } - getQpidSession().sync(); - getCurrentException(); } /** @@ -540,14 +560,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic null, name.toString().startsWith("amq.")? Option.PASSIVE:Option.NONE); // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + if (!nowait) + { + getQpidSession().sync(); + getCurrentException(); + } } /** * Declare a queue with the given queueName */ - public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) + public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean nowait) throws AMQException, FailoverException { // do nothing this is only used by 0_8 @@ -557,7 +581,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * Declare a queue with the given queueName */ public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal) + final boolean noLocal, final boolean nowait) throws AMQException, FailoverException { AMQShortString res; @@ -581,9 +605,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic amqd.isDurable() ? Option.DURABLE : Option.NONE, !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); // passive --> false - // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + if (!nowait) + { + // We need to sync so that we get notify of an error. + getQpidSession().sync(); + getCurrentException(); + } return res; } @@ -609,7 +636,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { for (BasicMessageConsumer consumer : _consumers.values()) { - getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag())); + getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), + Option.UNRELIABLE); } } else @@ -625,17 +653,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (consumer.getMessageListener() != null) { getQpidSession().messageFlow(consumerTag, - MessageCreditUnit.MESSAGE, 1); + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } } else { getQpidSession() .messageFlow(consumerTag, MessageCreditUnit.MESSAGE, - getAMQConnection().getMaxPrefetch()); + getAMQConnection().getMaxPrefetch(), + Option.UNRELIABLE); } getQpidSession() - .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF); + .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, + Option.UNRELIABLE); } catch (Exception e) { @@ -700,6 +731,19 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void opened(Session ssn) {} + public void resumed(Session ssn) + { + _qpidConnection = ssn.getConnection(); + try + { + resubscribe(); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } + public void message(Session ssn, MessageTransfer xfr) { messageReceived(new UnprocessedMessage_0_10(xfr)); @@ -716,7 +760,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void closed(Session ssn) {} protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal) + final boolean noLocal, final boolean nowait) throws AMQException { /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ @@ -736,34 +780,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic amqd.setQueueName(new AMQShortString( binddingKey + "@" + amqd.getExchangeName().toString() + "_" + UUID.randomUUID())); } - return send0_10QueueDeclare(amqd, protocolHandler, noLocal); + return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait); } }, _connection).execute(); } - - void start() throws AMQException - { - super.start(); - for(BasicMessageConsumer c: _consumers.values()) - { - c.start(); - } - } - - - void stop() throws AMQException - { - super.stop(); - for(BasicMessageConsumer c: _consumers.values()) - { - c.stop(); - } - } - - - - public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 6451ae60be..ff8631c12e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -106,7 +106,8 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B } public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName, final AMQDestination dest) throws AMQException, FailoverException + final AMQShortString exchangeName, final AMQDestination dest, + final boolean nowait) throws AMQException, FailoverException { getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody (getTicket(),queueName,exchangeName,routingKey,false,arguments). @@ -300,13 +301,14 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B { ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type, name.toString().startsWith("amq."), - false,false,false,nowait,null); + false,false,false,false,null); AMQFrame exchangeDeclare = body.generateFrame(_channelId); protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } - public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException + public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean nowait) throws AMQException, FailoverException { QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 76422c6297..2bb443a090 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -441,7 +441,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa o = _synchronousQueue.take(); } return o; - } + } + + abstract Message receiveBrowse() throws JMSException; public Message receiveNoWait() throws JMSException { @@ -1037,23 +1039,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa _synchronousQueue.clear(); } - public void start() - { - // do nothing as this is a 0_10 feature - } - - - public void stop() - { - // do nothing as this is a 0_10 feature - } - - public boolean isStrated() - { - // do nothing as this is a 0_10 feature - return false; - } - public AMQShortString getQueuename() { return _queuename; @@ -1070,10 +1055,13 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } /** to be called when a failover has occured */ - public void failedOver() + public void failedOverPre() { clearReceiveQueue(); // TGM FIXME: think this should just be removed // clearUnackedMessages(); } + + public void failedOverPost() {} + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 7d535643c0..8b17dcf91f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -31,6 +31,7 @@ import org.apache.qpid.filter.JMSSelectorFilter; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageListener; import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; @@ -148,7 +149,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (isMessageListenerSet() && ! getSession().prefetch()) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } _logger.debug("messageOk, trying to notify"); super.notifyMessage(jmsMessage); @@ -246,7 +248,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if(! getSession().prefetch()) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } } // now we need to acquire this message if needed @@ -258,9 +261,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _logger.debug("filterMessage - trying to acquire message"); } messageOk = acquireMessage(message); - _logger.debug("filterMessage - *************************************"); _logger.debug("filterMessage - message acquire status : " + messageOk); - _logger.debug("filterMessage - *************************************"); } return messageOk; } @@ -335,7 +336,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (messageListener != null && ! getSession().prefetch()) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } if (messageListener != null && !_synchronousQueue.isEmpty()) { @@ -349,26 +351,16 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } - public boolean isStrated() + public void failedOverPost() { - return _isStarted; - } - - public void start() - { - _isStarted = true; - if (_syncReceive.get()) + if (_0_10session.isStarted() && _syncReceive.get()) { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); + _0_10session.getQpidSession().messageFlow + (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } } - public void stop() - { - _isStarted = false; - } - /** * When messages are not prefetched we need to request a message from the * broker. @@ -380,16 +372,35 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM */ public Object getMessageFromQueue(long l) throws InterruptedException { - if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty()) - { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); - } if (! getSession().prefetch()) { _syncReceive.set(true); } + if (_0_10session.isStarted() && ! getSession().prefetch() && _synchronousQueue.isEmpty()) + { + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + } Object o = super.getMessageFromQueue(l); + if (o == null && _0_10session.isStarted()) + { + _0_10session.getQpidSession().messageFlush + (getConsumerTagString(), Option.UNRELIABLE, Option.SYNC); + _0_10session.getQpidSession().sync(); + _0_10session.getQpidSession().messageFlow + (getConsumerTagString(), MessageCreditUnit.BYTE, + 0xFFFFFFFF, Option.UNRELIABLE); + if (getSession().prefetch()) + { + _0_10session.getQpidSession().messageFlow + (getConsumerTagString(), MessageCreditUnit.MESSAGE, + _0_10session.getAMQConnection().getMaxPrefetch(), + Option.UNRELIABLE); + } + _0_10session.syncDispatchQueue(); + o = super.getMessageFromQueue(-1); + } if (! getSession().prefetch()) { _syncReceive.set(false); @@ -404,6 +415,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { _session.acknowledgeMessage(msg.getDeliveryTag(), false); } + + if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE && + !_session.isInRecovery() && + _session.getAMQConnection().getSyncAck()) + { + ((AMQSession_0_10) getSession()).flushAcknowledgments(); + ((AMQSession_0_10) getSession()).getQpidSession().sync(); + } + } + + Message receiveBrowse() throws JMSException + { + return receiveNoWait(); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 494a8fb43d..308f04f082 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -22,6 +22,7 @@ package org.apache.qpid.client; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; +import javax.jms.Message; import org.apache.qpid.AMQException; import org.apache.qpid.QpidException; @@ -38,9 +39,9 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe protected final Logger _logger = LoggerFactory.getLogger(getClass()); protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination, - String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException + String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, + AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow, + boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException { super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session, protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive, @@ -73,13 +74,18 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe } } - public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception - { + public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception + { return _messageFactory.createMessage(messageFrame.getDeliveryTag(), - messageFrame.isRedelivered(), messageFrame.getExchange(), - messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); + messageFrame.isRedelivered(), messageFrame.getExchange(), + messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); + + } + Message receiveBrowse() throws JMSException + { + return receive(); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 954a3bc28f..5ff6066ddc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -46,6 +46,8 @@ import org.slf4j.LoggerFactory; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { + enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; + protected final Logger _logger = LoggerFactory.getLogger(getClass()); private AMQConnection _connection; @@ -120,6 +122,8 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac protected String _userID; // ref user id used in the connection. private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; + + protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL; protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory, @@ -141,6 +145,26 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac _mandatory = mandatory; _waitUntilSent = waitUntilSent; _userID = connection.getUsername(); + setPublishMode(); + } + + void setPublishMode() + { + // Publish mode could be configured at destination level as well. + // Will add support for this when we provide a more robust binding URL + + String syncPub = _connection.getSyncPublish(); + // Support for deprecated option sync_persistence + if (syncPub.equals("persistent") || _connection.getSyncPersistence()) + { + publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT; + } + else if (syncPub.equals("all")) + { + publishMode = PublishMode.SYNC_PUBLISH_ALL; + } + + _logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode); } void resubscribe() throws AMQException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 4e5077f0cd..b8c5fc8faf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -151,9 +151,13 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer ((AMQSession_0_10) getSession()).getQpidSession(); // if true, we need to sync the delivery of this message - boolean sync = (deliveryMode == DeliveryMode.PERSISTENT && - getSession().getAMQConnection().getSyncPersistence()); + boolean sync = false; + sync = ( (publishMode == PublishMode.SYNC_PUBLISH_ALL) || + (publishMode == PublishMode.SYNC_PUBLISH_PERSISTENT && + deliveryMode == DeliveryMode.PERSISTENT) + ); + org.apache.mina.common.ByteBuffer data = message.getData(); ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java index 986154cda8..3627618e68 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java @@ -47,6 +47,19 @@ public class ClientProperties */ public static final String SYNC_PERSISTENT_PROP_NAME = "sync_persistence"; + /** + * When true a sync command is sent after sending a message ack. + * type: boolean + */ + public static final String SYNC_ACK_PROP_NAME = "sync_ack"; + + /** + * sync_publish property - {persistent|all} + * If set to 'persistent',then persistent messages will be publish synchronously + * If set to 'all', then all messages regardless of the delivery mode will be + * published synchronously. + */ + public static final String SYNC_PUBLISH_PROP_NAME = "sync_publish"; /** * This value will be used in the following settings diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index 39b9597af1..56e9a5dc73 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -25,8 +25,6 @@ import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; import javax.jms.JMSException; import javax.jms.MessageFormatException; @@ -35,8 +33,6 @@ import javax.jms.ObjectMessage; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage { @@ -157,7 +153,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag } finally { - _data.rewind(); + // _data.rewind(); close(in); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java b/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java index fab95f754c..f3f74dd332 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java @@ -58,22 +58,31 @@ public class URLParser if ((connection.getHost() == null) || connection.getHost().equals("")) { - String uid = AMQConnectionFactory.getUniqueClientID(); - if (uid == null) - { - throw URLHelper.parseError(-1, "Client Name not specified", fullURL); + String tmp = connection.getAuthority(); + // hack to read a clientid such as "my_clientID" + if (tmp != null && tmp.indexOf('@') < tmp.length()-1) + { + _url.setClientName(tmp.substring(tmp.indexOf('@')+1,tmp.length())); } else { - _url.setClientName(uid); + String uid = AMQConnectionFactory.getUniqueClientID(); + if (uid == null) + { + throw URLHelper.parseError(-1, "Client Name not specified", fullURL); + } + else + { + _url.setClientName(uid); + } } - } + } else { _url.setClientName(connection.getHost()); } - + String userInfo = connection.getUserInfo(); if (userInfo == null) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index da8cd4f750..03ab967c36 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -33,9 +33,11 @@ import java.util.List; */ public interface ConnectionURL { - public static final String AMQ_SYNC_PERSISTENCE = "sync_persistence"; - public static final String AMQ_MAXPREFETCH = "maxprefetch"; public static final String AMQ_PROTOCOL = "amqp"; + public static final String OPTIONS_SYNC_PERSISTENCE = "sync_persistence"; + public static final String OPTIONS_MAXPREFETCH = "maxprefetch"; + public static final String OPTIONS_SYNC_ACK = "sync_ack"; + public static final String OPTIONS_SYNC_PUBLISH = "sync_publish"; public static final String OPTIONS_BROKERLIST = "brokerlist"; public static final String OPTIONS_FAILOVER = "failover"; public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount"; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java index 6f945687cf..e05a7ab6e2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.jms.failover; +import java.net.InetAddress; import java.util.ArrayList; import java.util.List; @@ -52,7 +53,7 @@ import org.slf4j.LoggerFactory; * from the list. */ -public class FailoverExchangeMethod extends FailoverRoundRobinServers implements FailoverMethod, MessageListener +public class FailoverExchangeMethod implements FailoverMethod, MessageListener { private static final Logger _logger = LoggerFactory.getLogger(FailoverExchangeMethod.class); @@ -65,17 +66,29 @@ public class FailoverExchangeMethod extends FailoverRoundRobinServers implements /** The session used to subscribe to failover exchange */ private Session _ssn; - private BrokerDetails _orginalBrokerDetail; + private BrokerDetails _originalBrokerDetail; + + /** The index into the hostDetails array of the broker to which we are connected */ + private int _currentBrokerIndex = 0; + + /** The broker currently selected **/ + private BrokerDetails _currentBrokerDetail; + + /** Array of BrokerDetail used to make connections. */ + private ConnectionURL _connectionDetails; + + /** Denotes the number of failed attempts **/ + private int _failedAttemps = 0; public FailoverExchangeMethod(ConnectionURL connectionDetails, AMQConnection conn) { - super(connectionDetails); - _orginalBrokerDetail = _connectionDetails.getBrokerDetails(0); + _connectionDetails = connectionDetails; + _originalBrokerDetail = _connectionDetails.getBrokerDetails(0); // This is not safe to use until attainConnection is called, as this ref will not initialized fully. // The reason being this constructor is called inside the AMWConnection constructor. // It would be best if we find a way to pass this ref after AMQConnection is fully initialized. - _conn = conn; + _conn = conn; } private void subscribeForUpdates() throws JMSException @@ -96,6 +109,17 @@ public class FailoverExchangeMethod extends FailoverRoundRobinServers implements public void onMessage(Message m) { _logger.info("Failover exchange notified cluster membership change"); + + String currentBrokerIP = ""; + try + { + currentBrokerIP = InetAddress.getByName(_currentBrokerDetail.getHost()).getHostAddress(); + } + catch(Exception e) + { + _logger.warn("Unable to resolve current broker host name",e); + } + List<BrokerDetails> brokerList = new ArrayList<BrokerDetails>(); try { @@ -109,15 +133,22 @@ public class FailoverExchangeMethod extends FailoverRoundRobinServers implements for (String url:urls) { String[] tokens = url.split(":"); - if (tokens[0].equalsIgnoreCase(_orginalBrokerDetail.getTransport())) + if (tokens[0].equalsIgnoreCase(_originalBrokerDetail.getTransport())) { BrokerDetails broker = new AMQBrokerDetails(); broker.setTransport(tokens[0]); broker.setHost(tokens[1]); broker.setPort(Integer.parseInt(tokens[2])); - broker.setProperties(_orginalBrokerDetail.getProperties()); - broker.setSSLConfiguration(_orginalBrokerDetail.getSSLConfiguration()); + broker.setProperties(_originalBrokerDetail.getProperties()); + broker.setSSLConfiguration(_originalBrokerDetail.getSSLConfiguration()); brokerList.add(broker); + + if (currentBrokerIP.equals(broker.getHost()) && + _currentBrokerDetail.getPort() == broker.getPort()) + { + _currentBrokerIndex = brokerList.indexOf(broker); + } + break; } } @@ -132,13 +163,20 @@ public class FailoverExchangeMethod extends FailoverRoundRobinServers implements { _connectionDetails.setBrokerDetails(brokerList); } + + _logger.info("============================================================"); + _logger.info("Updated cluster membership details " + _connectionDetails); + _logger.info("============================================================"); } public void attainedConnection() { - super.attainedConnection(); try { + _failedAttemps = 0; + _logger.info("============================================================"); + _logger.info("Attained connection "); + _logger.info("============================================================"); subscribeForUpdates(); } catch (JMSException e) @@ -151,17 +189,92 @@ public class FailoverExchangeMethod extends FailoverRoundRobinServers implements { synchronized (_brokerListLock) { - return super.getCurrentBrokerDetails(); + return _connectionDetails.getBrokerDetails(_currentBrokerIndex); } - } - + } + public BrokerDetails getNextBrokerDetails() { synchronized(_brokerListLock) { - return super.getNextBrokerDetails(); + if (_currentBrokerIndex == (_connectionDetails.getBrokerCount() - 1)) + { + _currentBrokerIndex = 0; + } + else + { + _currentBrokerIndex++; + } + + BrokerDetails broker = _connectionDetails.getBrokerDetails(_currentBrokerIndex); + + // When the broker list is updated it will include the current broker as well + // There is no point trying it again, so trying the next one. + if (_currentBrokerDetail != null && + broker.getHost().equals(_currentBrokerDetail.getHost()) && + broker.getPort() == _currentBrokerDetail.getPort()) + { + return getNextBrokerDetails(); + } + + String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY); + if (delayStr != null) + { + Long delay = Long.parseLong(delayStr); + _logger.info("Delay between connect retries:" + delay); + try + { + Thread.sleep(delay); + } + catch (InterruptedException ie) + { + return null; + } + } + else + { + _logger.info("No delay between connect retries, use tcp://host:port?connectdelay='value' to enable."); + } + + _failedAttemps ++; + _currentBrokerDetail = broker; + return broker; } } + + public boolean failoverAllowed() + { + // We allow to Failover provided + // our broker list is not empty and + // we haven't gone through all of them + + boolean b = _connectionDetails.getBrokerCount() > 0 && + _failedAttemps <= _connectionDetails.getBrokerCount(); + + + _logger.info("============================================================"); + _logger.info(toString()); + _logger.info("FailoverAllowed " + b); + _logger.info("============================================================"); + + return b; + } + + public void reset() + { + _failedAttemps = 0; + } + + public void setBroker(BrokerDetails broker) + { + // not sure if this method is needed + } + + public void setRetries(int maxRetries) + { + // no max retries we keep trying as long + // as we get updates + } public String methodName() { @@ -172,7 +285,24 @@ public class FailoverExchangeMethod extends FailoverRoundRobinServers implements { StringBuffer sb = new StringBuffer(); sb.append("FailoverExchange:\n"); - sb.append(super.toString()); + sb.append("\n Current Broker Index:"); + sb.append(_currentBrokerIndex); + sb.append("\n Failed Attempts:"); + sb.append(_failedAttemps); + sb.append("\n Orignal broker details:"); + sb.append(_originalBrokerDetail).append("\n"); + sb.append("\n -------- Broker List -----------\n"); + for (int i = 0; i < _connectionDetails.getBrokerCount(); i++) + { + if (i == _currentBrokerIndex) + { + sb.append(">"); + } + + sb.append(_connectionDetails.getBrokerDetails(i)); + sb.append("\n"); + } + sb.append("--------------------------------\n"); return sb.toString(); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java index 7190344f59..c7d8c69fff 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java @@ -30,7 +30,7 @@ public class FailoverRoundRobinServers implements FailoverMethod private static final Logger _logger = LoggerFactory.getLogger(FailoverRoundRobinServers.class); /** The default number of times to cycle through all servers */ - public static final int DEFAULT_CYCLE_RETRIES = 0; + public static final int DEFAULT_CYCLE_RETRIES = 1; /** The default number of times to retry each server */ public static final int DEFAULT_SERVER_RETRIES = 0; @@ -66,6 +66,8 @@ public class FailoverRoundRobinServers implements FailoverMethod String cycleRetries = _connectionDetails.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE); + _cycleRetries = DEFAULT_CYCLE_RETRIES; + if (cycleRetries != null) { try @@ -74,7 +76,7 @@ public class FailoverRoundRobinServers implements FailoverMethod } catch (NumberFormatException nfe) { - _cycleRetries = DEFAULT_CYCLE_RETRIES; + _logger.warn("Cannot set cycle Retries, " + cycleRetries + " is not a number. Using default: " + DEFAULT_CYCLE_RETRIES); } } @@ -93,8 +95,8 @@ public class FailoverRoundRobinServers implements FailoverMethod public boolean failoverAllowed() { - return ((_currentCycleRetries < _cycleRetries) || (_currentServerRetry < _serverRetries) - || (_currentBrokerIndex < (_connectionDetails.getBrokerCount() - 1))); + return ((_currentCycleRetries < _cycleRetries) || (_currentServerRetry < _serverRetries)); + //|| (_currentBrokerIndex <= (_connectionDetails.getBrokerCount() - 1))); } public void attainedConnection() diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java index d05e90823c..7400b524fd 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java @@ -375,6 +375,19 @@ public class ConnectionURLTest extends TestCase assertTrue(connectionurl.getBrokerCount() == 1); } + public void testClientIDWithUnderscore() throws URLSyntaxException + { + String url = "amqp://user:pass@client_id/test?brokerlist='tcp://localhost:5672'"; + + ConnectionURL connectionurl = new AMQConnectionURL(url); + + assertTrue(connectionurl.getUsername().equals("user")); + assertTrue(connectionurl.getPassword().equals("pass")); + assertTrue(connectionurl.getVirtualHost().equals("/test")); + assertTrue(connectionurl.getClientName().equals("client_id")); + + assertTrue(connectionurl.getBrokerCount() == 1); + } public void testWrongOptionSeparatorInOptions() { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index a881f6a822..566a222897 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -44,7 +44,9 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe } - public void sendQueueBind(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException + public void sendQueueBind(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, + AMQShortString exchangeName, AMQDestination destination, + boolean nowait) throws AMQException, FailoverException { } @@ -129,7 +131,8 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe } - public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException, FailoverException + public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler, + boolean nowait) throws AMQException, FailoverException { } |