From 7806372f270122f742c98f5ff73c4e66b8e8bde7 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 17 Apr 2012 20:09:37 +0000 Subject: QPID-3953 : [Java AMQP 1-0] Fix durable subscribers git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1327268 13f79535-47bb-0310-9956-ffa450edef68 --- .../amqp_1_0/jms/impl/ConnectionFactoryImpl.java | 16 ++++++++---- .../amqp_1_0/jms/impl/TopicSubscriberImpl.java | 8 ++++++ .../apache/qpid/amqp_1_0/transport/Container.java | 5 +++- .../amqp_1_0/type/security/SaslMechanisms.java | 8 +++--- .../qpid/server/protocol/ProtocolEngine_1_0_0.java | 29 ++++++++++++---------- .../server/protocol/ProtocolEngine_1_0_0_SASL.java | 20 ++++++--------- .../qpid/server/protocol/v1_0/SendingLink_1_0.java | 4 +-- .../qpid/server/protocol/v1_0/Session_1_0.java | 28 ++++++++++++--------- 8 files changed, 68 insertions(+), 50 deletions(-) diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java index 66bf91a53f..d9e6dfe36d 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java @@ -20,16 +20,14 @@ */ package org.apache.qpid.amqp_1_0.jms.impl; -import org.apache.qpid.amqp_1_0.jms.Connection; -import org.apache.qpid.amqp_1_0.jms.ConnectionFactory; - +import java.net.MalformedURLException; +import java.net.URL; import javax.jms.JMSException; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; -import java.net.MalformedURLException; -import java.net.URL; +import org.apache.qpid.amqp_1_0.jms.ConnectionFactory; public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnectionFactory, QueueConnectionFactory { @@ -42,6 +40,14 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection private boolean _ssl; + public ConnectionFactoryImpl(final String host, + final int port, + final String username, + final String password) + { + this(host,port,username,password,null,false); + } + public ConnectionFactoryImpl(final String host, final int port, final String username, diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java index 9550be3779..52d8c412ec 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java @@ -79,6 +79,14 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub if(!address.equals(actualAddress) || !filtersEqual(getFilters(), actualFilters)) { receiver.close(); + if(isDurable()) + { + receiver = getSession().getClientSession().createReceiver(address, + StdDistMode.COPY, AcknowledgeMode.ALO, + getLinkName(), false, getFilters(), + null); + receiver.close(); + } receiver = getSession().getClientSession().createReceiver(address, StdDistMode.COPY, AcknowledgeMode.ALO, getLinkName(), isDurable(), getFilters(), diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java index 1fa5df4640..2e7e2fe2ea 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java @@ -24,10 +24,13 @@ package org.apache.qpid.amqp_1_0.transport; import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.concurrent.atomic.AtomicInteger; public class Container { + private static final AtomicInteger CONTAINER_ID = new AtomicInteger(0); + private String _id; public Container() @@ -57,7 +60,7 @@ public class Container pid = "unknown"; } - _id = hostname + '(' + pid + ')'; + _id = hostname + '(' + pid + ')' + ':' + CONTAINER_ID.incrementAndGet(); } diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/security/SaslMechanisms.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/security/SaslMechanisms.java index 7f4775c3ae..9598035899 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/security/SaslMechanisms.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/security/SaslMechanisms.java @@ -24,10 +24,10 @@ package org.apache.qpid.amqp_1_0.type.security; +import java.util.Arrays; import org.apache.qpid.amqp_1_0.transport.SASLEndpoint; - - -import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.SaslFrameBody; +import org.apache.qpid.amqp_1_0.type.Symbol; public class SaslMechanisms implements SaslFrameBody @@ -58,7 +58,7 @@ public class SaslMechanisms { builder.append(','); } - builder.append("saslServerMechanisms=").append(_saslServerMechanisms); + builder.append("saslServerMechanisms=").append(_saslServerMechanisms == null ? "" : Arrays.asList(_saslServerMechanisms)); } builder.append('}'); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java index e6c79a4077..f31ad5052b 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java @@ -20,31 +20,32 @@ */ package org.apache.qpid.server.protocol; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.security.auth.callback.CallbackHandler; import org.apache.qpid.amqp_1_0.codec.FrameWriter; import org.apache.qpid.amqp_1_0.framing.AMQFrame; import org.apache.qpid.amqp_1_0.framing.FrameHandler; import org.apache.qpid.amqp_1_0.framing.OversizeFrameException; -import org.apache.qpid.amqp_1_0.transport.*; +import org.apache.qpid.amqp_1_0.transport.CallbackHandlerSource; +import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; +import org.apache.qpid.amqp_1_0.transport.Container; +import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler; import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.FrameBody; - import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.server.configuration.*; +import org.apache.qpid.server.configuration.ConfigStore; +import org.apache.qpid.server.configuration.ConnectionConfigType; import org.apache.qpid.server.protocol.v1_0.Connection_1_0; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; -import javax.security.auth.callback.CallbackHandler; -import java.io.PrintWriter; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.security.Principal; -import java.util.UUID; -import java.util.logging.*; -import java.util.concurrent.atomic.AtomicLong; - public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHandler { static final AtomicLong _connectionIdSource = new AtomicLong(0L); @@ -94,6 +95,8 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa } private State _state = State.A; + + public ProtocolEngine_1_0_0(final IApplicationRegistry appRegistry, long id) { @@ -138,7 +141,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa _network = network; _sender = sender; - Container container = new Container(); + Container container = new Container(_appRegistry.getBrokerId().toString()); _conn = new ConnectionEndpoint(container,asCallbackHandlerSource(_appRegistry.getAuthenticationManager())); _conn.setConnectionEventListener(new Connection_1_0(_appRegistry)); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java index e4487e00f9..ffd5e750b4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java @@ -20,10 +20,16 @@ */ package org.apache.qpid.server.protocol; +import java.io.PrintWriter; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.UUID; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.security.auth.callback.CallbackHandler; import org.apache.qpid.amqp_1_0.codec.FrameWriter; import org.apache.qpid.amqp_1_0.codec.ProtocolHandler; import org.apache.qpid.amqp_1_0.framing.AMQFrame; -import org.apache.qpid.amqp_1_0.framing.FrameHandler; import org.apache.qpid.amqp_1_0.framing.OversizeFrameException; import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler; import org.apache.qpid.amqp_1_0.transport.CallbackHandlerSource; @@ -32,7 +38,6 @@ import org.apache.qpid.amqp_1_0.transport.Container; import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler; import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.FrameBody; - import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConnectionConfigType; @@ -43,15 +48,6 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; -import javax.security.auth.callback.CallbackHandler; -import java.io.PrintWriter; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.security.Principal; -import java.util.UUID; -import java.util.logging.Level; -import java.util.logging.Logger; - public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOutputHandler { private long _readBytes; @@ -165,7 +161,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut _network = network; _sender = sender; - Container container = new Container(); + Container container = new Container(_appRegistry.getBrokerId().toString()); _conn = new ConnectionEndpoint(container, asCallbackHandlerSource(ApplicationRegistry.getInstance() .getAuthenticationManager())); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 8d227d9677..b3e9a74d04 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -347,10 +347,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { //TODO // if not durable or close - if(!TerminusDurability.UNSETTLED_STATE.equals(_durability) || - (detach != null && Boolean.TRUE.equals(detach.getClosed()))) + if(!TerminusDurability.UNSETTLED_STATE.equals(_durability)) { - AMQQueue queue = _subscription.getQueue(); try diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index ef298b4731..48a551e42a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -24,11 +24,10 @@ import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SessionEventListener; -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.LifetimePolicy; -import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.*; import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.qpid.amqp_1_0.type.messaging.Source; +import org.apache.qpid.amqp_1_0.type.messaging.Target; import org.apache.qpid.amqp_1_0.type.transaction.Coordinator; import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability; import org.apache.qpid.amqp_1_0.type.transport.*; @@ -143,13 +142,6 @@ public class Session_1_0 implements SessionEventListener if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())) { linkRegistry.registerSendingLink(endpoint.getName(), sendingLink); - sendingLink.setCloseAction(new Runnable() { - - public void run() - { - linkRegistry.unregisterSendingLink(endpoint.getName()); - } - }); } } catch(AmqpErrorException e) @@ -163,7 +155,19 @@ public class Session_1_0 implements SessionEventListener } else { - endpoint.setSource(previousLink.getEndpoint().getSource()); + Source newSource = (Source) endpoint.getSource(); + + Source oldSource = (Source) previousLink.getEndpoint().getSource(); + final TerminusDurability newSourceDurable = newSource == null ? null : newSource.getDurable(); + if(newSourceDurable != null) + { + oldSource.setDurable(newSourceDurable); + if(newSourceDurable.equals(TerminusDurability.NONE)) + { + linkRegistry.unregisterSendingLink(endpoint.getName()); + } + } + endpoint.setSource(oldSource); SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint; previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint)); sendingLinkEndpoint.setLinkEventListener(previousLink); -- cgit v1.2.1