summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-04-17 20:09:37 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-04-17 20:09:37 +0000
commit7806372f270122f742c98f5ff73c4e66b8e8bde7 (patch)
treebd439902eaa745f48c63ffa263315f64f8af2f67
parent07aad7e3cac4568f2226b6e2bec00e9668de9cb4 (diff)
downloadqpid-python-7806372f270122f742c98f5ff73c4e66b8e8bde7.tar.gz
QPID-3953 : [Java AMQP 1-0] Fix durable subscribers
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1327268 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java16
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java8
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java5
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/security/SaslMechanisms.java8
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java20
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java28
8 files changed, 68 insertions, 50 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
index 66bf91a53f..d9e6dfe36d 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
@@ -20,16 +20,14 @@
*/
package org.apache.qpid.amqp_1_0.jms.impl;
-import org.apache.qpid.amqp_1_0.jms.Connection;
-import org.apache.qpid.amqp_1_0.jms.ConnectionFactory;
-
+import java.net.MalformedURLException;
+import java.net.URL;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
-import java.net.MalformedURLException;
-import java.net.URL;
+import org.apache.qpid.amqp_1_0.jms.ConnectionFactory;
public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnectionFactory, QueueConnectionFactory
{
@@ -45,6 +43,14 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection
public ConnectionFactoryImpl(final String host,
final int port,
final String username,
+ final String password)
+ {
+ this(host,port,username,password,null,false);
+ }
+
+ public ConnectionFactoryImpl(final String host,
+ final int port,
+ final String username,
final String password,
final String clientId)
{
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
index 9550be3779..52d8c412ec 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
@@ -79,6 +79,14 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub
if(!address.equals(actualAddress) || !filtersEqual(getFilters(), actualFilters))
{
receiver.close();
+ if(isDurable())
+ {
+ receiver = getSession().getClientSession().createReceiver(address,
+ StdDistMode.COPY, AcknowledgeMode.ALO,
+ getLinkName(), false, getFilters(),
+ null);
+ receiver.close();
+ }
receiver = getSession().getClientSession().createReceiver(address,
StdDistMode.COPY, AcknowledgeMode.ALO,
getLinkName(), isDurable(), getFilters(),
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java
index 1fa5df4640..2e7e2fe2ea 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java
@@ -24,10 +24,13 @@ package org.apache.qpid.amqp_1_0.transport;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicInteger;
public class Container
{
+ private static final AtomicInteger CONTAINER_ID = new AtomicInteger(0);
+
private String _id;
public Container()
@@ -57,7 +60,7 @@ public class Container
pid = "unknown";
}
- _id = hostname + '(' + pid + ')';
+ _id = hostname + '(' + pid + ')' + ':' + CONTAINER_ID.incrementAndGet();
}
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/security/SaslMechanisms.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/security/SaslMechanisms.java
index 7f4775c3ae..9598035899 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/security/SaslMechanisms.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/security/SaslMechanisms.java
@@ -24,10 +24,10 @@
package org.apache.qpid.amqp_1_0.type.security;
+import java.util.Arrays;
import org.apache.qpid.amqp_1_0.transport.SASLEndpoint;
-
-
-import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
+import org.apache.qpid.amqp_1_0.type.Symbol;
public class SaslMechanisms
implements SaslFrameBody
@@ -58,7 +58,7 @@ public class SaslMechanisms
{
builder.append(',');
}
- builder.append("saslServerMechanisms=").append(_saslServerMechanisms);
+ builder.append("saslServerMechanisms=").append(_saslServerMechanisms == null ? "" : Arrays.asList(_saslServerMechanisms));
}
builder.append('}');
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
index e6c79a4077..f31ad5052b 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
@@ -20,31 +20,32 @@
*/
package org.apache.qpid.server.protocol;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.security.auth.callback.CallbackHandler;
import org.apache.qpid.amqp_1_0.codec.FrameWriter;
import org.apache.qpid.amqp_1_0.framing.AMQFrame;
import org.apache.qpid.amqp_1_0.framing.FrameHandler;
import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
-import org.apache.qpid.amqp_1_0.transport.*;
+import org.apache.qpid.amqp_1_0.transport.CallbackHandlerSource;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.Container;
+import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.FrameBody;
-
import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.server.configuration.*;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConnectionConfigType;
import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
-import javax.security.auth.callback.CallbackHandler;
-import java.io.PrintWriter;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.UUID;
-import java.util.logging.*;
-import java.util.concurrent.atomic.AtomicLong;
-
public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHandler
{
static final AtomicLong _connectionIdSource = new AtomicLong(0L);
@@ -94,6 +95,8 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
}
private State _state = State.A;
+
+
public ProtocolEngine_1_0_0(final IApplicationRegistry appRegistry, long id)
{
@@ -138,7 +141,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
_network = network;
_sender = sender;
- Container container = new Container();
+ Container container = new Container(_appRegistry.getBrokerId().toString());
_conn = new ConnectionEndpoint(container,asCallbackHandlerSource(_appRegistry.getAuthenticationManager()));
_conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
index e4487e00f9..ffd5e750b4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
@@ -20,10 +20,16 @@
*/
package org.apache.qpid.server.protocol;
+import java.io.PrintWriter;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.security.auth.callback.CallbackHandler;
import org.apache.qpid.amqp_1_0.codec.FrameWriter;
import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
import org.apache.qpid.amqp_1_0.framing.AMQFrame;
-import org.apache.qpid.amqp_1_0.framing.FrameHandler;
import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler;
import org.apache.qpid.amqp_1_0.transport.CallbackHandlerSource;
@@ -32,7 +38,6 @@ import org.apache.qpid.amqp_1_0.transport.Container;
import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.FrameBody;
-
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConnectionConfigType;
@@ -43,15 +48,6 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
-import javax.security.auth.callback.CallbackHandler;
-import java.io.PrintWriter;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.UUID;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOutputHandler
{
private long _readBytes;
@@ -165,7 +161,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
_network = network;
_sender = sender;
- Container container = new Container();
+ Container container = new Container(_appRegistry.getBrokerId().toString());
_conn = new ConnectionEndpoint(container, asCallbackHandlerSource(ApplicationRegistry.getInstance()
.getAuthenticationManager()));
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 8d227d9677..b3e9a74d04 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -347,10 +347,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
//TODO
// if not durable or close
- if(!TerminusDurability.UNSETTLED_STATE.equals(_durability) ||
- (detach != null && Boolean.TRUE.equals(detach.getClosed())))
+ if(!TerminusDurability.UNSETTLED_STATE.equals(_durability))
{
-
AMQQueue queue = _subscription.getQueue();
try
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index ef298b4731..48a551e42a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -24,11 +24,10 @@ import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.LifetimePolicy;
-import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.*;
import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
import org.apache.qpid.amqp_1_0.type.transaction.Coordinator;
import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
import org.apache.qpid.amqp_1_0.type.transport.*;
@@ -143,13 +142,6 @@ public class Session_1_0 implements SessionEventListener
if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
{
linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
- sendingLink.setCloseAction(new Runnable() {
-
- public void run()
- {
- linkRegistry.unregisterSendingLink(endpoint.getName());
- }
- });
}
}
catch(AmqpErrorException e)
@@ -163,7 +155,19 @@ public class Session_1_0 implements SessionEventListener
}
else
{
- endpoint.setSource(previousLink.getEndpoint().getSource());
+ Source newSource = (Source) endpoint.getSource();
+
+ Source oldSource = (Source) previousLink.getEndpoint().getSource();
+ final TerminusDurability newSourceDurable = newSource == null ? null : newSource.getDurable();
+ if(newSourceDurable != null)
+ {
+ oldSource.setDurable(newSourceDurable);
+ if(newSourceDurable.equals(TerminusDurability.NONE))
+ {
+ linkRegistry.unregisterSendingLink(endpoint.getName());
+ }
+ }
+ endpoint.setSource(oldSource);
SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint));
sendingLinkEndpoint.setLinkEventListener(previousLink);