diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2013-05-15 13:15:04 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2013-05-15 13:15:04 +0000 |
commit | 9d5f78ba71ce4503758f97f0eb5c659850a621ae (patch) | |
tree | cd79603ba5da696e7d111c9182ee631f5cb5e000 | |
parent | 62fa41c41ec453be13e4a8e5846139386c3c8c4c (diff) | |
download | qpid-python-9d5f78ba71ce4503758f97f0eb5c659850a621ae.tar.gz |
Merge fixes for JMS AMQP 1-0 client from trunk to 0.22 release
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.22@1482809 13f79535-47bb-0310-9956-ffa450edef68
20 files changed, 528 insertions, 181 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java new file mode 100644 index 0000000000..bc2b6349c8 --- /dev/null +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.amqp_1_0.jms; + +import javax.jms.JMSException; + +public class MessageRejectedException extends JMSException +{ + public MessageRejectedException(String s) + { + super(s); + } +} diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java index 4856a7c491..96788ecd27 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java @@ -45,6 +45,7 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection private String _queuePrefix; private String _topicPrefix; + private boolean _useBinaryMessageId; public ConnectionFactoryImpl(final String host, final int port, @@ -100,6 +101,7 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl); connection.setQueuePrefix(_queuePrefix); connection.setTopicPrefix(_topicPrefix); + connection.setUseBinaryMessageId(_useBinaryMessageId); return connection; } @@ -149,6 +151,9 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection String password = null; String clientId = null; String remoteHost = null; + + boolean binaryMessageId = true; + if(userInfo != null) { String[] components = userInfo.split(":",2); @@ -161,22 +166,26 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection String query = url.getQuery(); if(query != null) { - for(String param : query.split("&")) - { - String[] keyValuePair = param.split("=",2); - if(keyValuePair[0].equalsIgnoreCase("clientid")) - { - clientId = keyValuePair[1]; - } - else if(keyValuePair[0].equalsIgnoreCase("ssl")) - { - ssl = Boolean.valueOf(keyValuePair[1]); - } - else if(keyValuePair[0].equalsIgnoreCase("remote-host")) - { - remoteHost = keyValuePair[1]; - } - } + for(String param : query.split("&")) + { + String[] keyValuePair = param.split("=",2); + if(keyValuePair[0].equalsIgnoreCase("clientid")) + { + clientId = keyValuePair[1]; + } + else if(keyValuePair[0].equalsIgnoreCase("ssl")) + { + ssl = Boolean.valueOf(keyValuePair[1]); + } + else if(keyValuePair[0].equalsIgnoreCase("remote-host")) + { + remoteHost = keyValuePair[1]; + } + else if (keyValuePair[0].equalsIgnoreCase("binary-messageid")) + { + binaryMessageId = Boolean.parseBoolean(keyValuePair[1]); + } + } } if(remoteHost == null) @@ -184,7 +193,11 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection remoteHost = host; } - return new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl); + ConnectionFactoryImpl connectionFactory = + new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl); + connectionFactory.setUseBinaryMessageId(binaryMessageId); + + return connectionFactory; } @@ -235,4 +248,9 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection { _queuePrefix = queuePrefix; } + + public void setUseBinaryMessageId(boolean useBinaryMessageId) + { + _useBinaryMessageId = useBinaryMessageId; + } } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java index 73701889b5..0ad62fd730 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java @@ -56,7 +56,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect private String _clientId;
private String _queuePrefix;
private String _topicPrefix;
-
+ private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true"));
private static enum State
{
@@ -163,6 +163,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect connect();
started = true;
}
+
try
{
SessionImpl session = new SessionImpl(this, acknowledgeMode);
@@ -170,6 +171,11 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect session.setTopicSession(_isTopicConnection);
_sessions.add(session);
+ if(_state == State.STARTED)
+ {
+ session.start();
+ }
+
return session;
}
catch(JMSException e)
@@ -191,9 +197,17 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect throw e;
}
}
-
}
+
+ }
+
+ void removeSession(SessionImpl session)
+ {
+ synchronized (_lock)
+ {
+ _sessions.remove(session);
+ }
}
private void reconnect(String networkHost, int port, String hostName)
@@ -410,10 +424,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect public boolean isStarted()
{
- synchronized (_lock)
- {
- return _state == State.STARTED;
- }
+ return _state == State.STARTED;
}
void setQueueConnection(final boolean queueConnection)
@@ -499,4 +510,15 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect return new DecodedDestination(address, kind);
}
+ void setUseBinaryMessageId(boolean useBinaryMessageId)
+ {
+ _useBinaryMessageId = useBinaryMessageId;
+ }
+
+ boolean useBinaryMessageId()
+ {
+ return _useBinaryMessageId;
+ }
+
+
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index def5ae5931..d2b34e0f13 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List;
import java.util.Map;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.InvalidSelectorException;
@@ -117,6 +118,29 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi _session = session;
_receiver = createClientReceiver();
+ _receiver.setRemoteErrorListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ final ExceptionListener exceptionListener = _session.getConnection().getExceptionListener();
+
+ if(exceptionListener != null)
+ {
+ final Error receiverError = _receiver.getError();
+ exceptionListener.onException(new JMSException(receiverError.getDescription(),
+ receiverError.getCondition().getValue().toString()));
+
+ }
+ }
+ catch (JMSException e)
+ {
+
+ }
+ }
+ });
}
@@ -125,8 +149,8 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi {
try
{
- return _session.getClientSession(). createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO,
- _linkName, _durable, getFilters(), null);
+ return _session.getClientSession().createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO,
+ _linkName, _durable, getFilters(), null);
}
catch (ConnectionErrorException e)
{
@@ -188,15 +212,16 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi {
checkClosed();
_messageListener = messageListener;
- _session.messageListenerSet( this );
_receiver.setMessageArrivalListener(new Receiver.MessageArrivalListener()
- {
+ {
+
+ public void messageArrived(final Receiver receiver)
+ {
+ _session.messageArrived(MessageConsumerImpl.this);
+ }
+ });
+ _session.messageListenerSet( this );
- public void messageArrived(final Receiver receiver)
- {
- _session.messageArrived(MessageConsumerImpl.this);
- }
- });
}
public MessageImpl receive() throws JMSException
@@ -219,12 +244,14 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi return receiveImpl(0L);
}
- private MessageImpl receiveImpl(long timeout) throws IllegalStateException
+ private MessageImpl receiveImpl(long timeout) throws JMSException
{
+
org.apache.qpid.amqp_1_0.client.Message msg;
boolean redelivery;
if(_replaymessages.isEmpty())
{
+ checkReceiverError();
msg = receive0(timeout);
redelivery = false;
}
@@ -241,8 +268,21 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi return createJMSMessage(msg, redelivery);
}
+ void checkReceiverError() throws JMSException
+ {
+ final Error receiverError = _receiver.getError();
+ if(receiverError != null)
+ {
+ JMSException jmsException =
+ new JMSException(receiverError.getDescription(), receiverError.getCondition().toString());
+
+ throw jmsException;
+ }
+ }
+
Message receive0(final long timeout)
{
+
Message message = _receiver.receive(timeout);
if(_session.getAckModeEnum() == Session.AcknowledgeMode.CLIENT_ACKNOWLEDGE)
{
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java index 77544e4112..79c1606edb 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java @@ -19,17 +19,22 @@ package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
+import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.jms.MessageProducer;
+import org.apache.qpid.amqp_1_0.jms.MessageRejectedException;
import org.apache.qpid.amqp_1_0.jms.QueueSender;
import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import javax.jms.*;
import javax.jms.IllegalStateException;
import java.util.UUID;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.transport.*;
public class MessageProducerImpl implements MessageProducer, QueueSender, TopicPublisher
{
@@ -43,6 +48,8 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP private SessionImpl _session;
private Sender _sender;
private boolean _closed;
+ private boolean _syncPublish = Boolean.getBoolean("qpid.sync_publish");
+ private long _syncPublishTimeout = Long.getLong("qpid.sync_publish_timeout", 30000l);
protected MessageProducerImpl(final Destination destination,
final SessionImpl session) throws JMSException
@@ -81,6 +88,29 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP jmsEx.setLinkedException(e);
throw jmsEx;
}
+ _sender.setRemoteErrorListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ final ExceptionListener exceptionListener = _session.getConnection().getExceptionListener();
+
+ if(exceptionListener != null)
+ {
+ final org.apache.qpid.amqp_1_0.type.transport.Error receiverError = _sender.getError();
+ exceptionListener.onException(new JMSException(receiverError.getDescription(),
+ receiverError.getCondition().getValue().toString()));
+
+ }
+ }
+ catch (JMSException e)
+ {
+
+ }
+ }
+ });
}
}
@@ -234,7 +264,7 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP if(!getDisableMessageID() && msg.getMessageId() == null)
{
- final Binary messageId = generateMessageId();
+ final Object messageId = generateMessageId();
msg.setMessageId(messageId);
}
@@ -251,7 +281,28 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP final org.apache.qpid.amqp_1_0.client.Message clientMessage = new org.apache.qpid.amqp_1_0.client.Message(msg.getSections());
- _sender.send(clientMessage, _session.getTxn());
+ DispositionAction action = null;
+
+ if(_syncPublish)
+ {
+ action = new DispositionAction(_sender);
+ }
+
+ try
+ {
+ _sender.send(clientMessage, _session.getTxn(), action);
+ }
+ catch (LinkDetachedException e)
+ {
+ JMSException jmsException = new InvalidDestinationException("Sender has been closed");
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+
+ if(_syncPublish && !action.wasAccepted(_syncPublishTimeout + System.currentTimeMillis()))
+ {
+ throw new MessageRejectedException("Message was rejected");
+ }
if(getDestination() != null)
{
@@ -270,10 +321,11 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP send((Destination)queue, message, deliveryMode, priority, ttl);
}
- private Binary generateMessageId()
+ private Object generateMessageId()
{
UUID uuid = UUID.randomUUID();
- return new Binary(uuid.toString().getBytes());
+ final String messageIdString = uuid.toString();
+ return _session.getConnection().useBinaryMessageId() ? new Binary(messageIdString.getBytes()) : messageIdString;
}
public void send(final Destination destination, final Message message) throws JMSException
@@ -377,4 +429,61 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP {
send(topic, message, deliveryMode, priority, ttl);
}
+
+ private static class DispositionAction implements Sender.OutcomeAction
+ {
+ private final Sender _sender;
+ private final Object _lock;
+ private Outcome _outcome;
+
+ public DispositionAction(Sender sender)
+ {
+ _sender = sender;
+ _lock = sender.getEndpoint().getLock();
+ }
+
+ @Override
+ public void onOutcome(Binary deliveryTag, Outcome outcome)
+ {
+ synchronized (_lock)
+ {
+ _outcome = outcome;
+ _lock.notifyAll();
+ }
+ }
+
+ public boolean wasAccepted(long timeout) throws JMSException
+ {
+ synchronized(_lock)
+ {
+ while(_outcome == null && !_sender.getEndpoint().isDetached())
+ {
+ try
+ {
+ _lock.wait(timeout - System.currentTimeMillis());
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ if(_outcome == null)
+ {
+
+ if(_sender.getEndpoint().isDetached())
+ {
+ throw new JMSException("Link was detached");
+ }
+ else
+ {
+ throw new JMSException("Timed out waiting for message acceptance");
+ }
+ }
+ else
+ {
+ return _outcome instanceof Accepted;
+ }
+ }
+ }
+ }
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index ba487cc3f6..2ae67913fe 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -21,10 +21,12 @@ package org.apache.qpid.amqp_1_0.jms.impl; import java.io.Serializable;
import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
@@ -52,9 +54,11 @@ import org.apache.qpid.amqp_1_0.jms.TemporaryDestination; import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
import org.apache.qpid.amqp_1_0.jms.TopicSession;
import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
+import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
public class SessionImpl implements Session, QueueSession, TopicSession
{
@@ -90,6 +94,45 @@ public class SessionImpl implements Session, QueueSession, TopicSession jmsException.setLinkedException(e);
throw jmsException;
}
+ _session.getEndpoint().setSessionEventListener(new SessionEventListener.DefaultSessionEventListener()
+ {
+ @Override
+ public void remoteEnd(End end)
+ {
+ if(!_closed)
+ {
+ try
+ {
+ close();
+ }
+ catch (JMSException e)
+ {
+ }
+ try
+ {
+ final Error error = end.getError();
+ final ExceptionListener exceptionListener = _connection.getExceptionListener();
+ if(exceptionListener != null)
+ {
+ if(error != null)
+ {
+ exceptionListener.onException(new JMSException(error.getDescription(),
+ error.getCondition().getValue().toString()));
+ }
+ else
+ {
+ exceptionListener.onException(new JMSException("Session remotely closed"));
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+
+ }
+
+ }
+ }
+ });
if(_acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED)
{
_txn = _session.createSessionLocalTransaction();
@@ -230,6 +273,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession producer.close();
}
_session.close();
+ _connection.removeSession(this);
}
}
@@ -765,7 +809,6 @@ public class SessionImpl implements Session, QueueSession, TopicSession return _txn;
}
-
private class Dispatcher implements Runnable
{
@@ -816,7 +859,6 @@ public class SessionImpl implements Session, QueueSession, TopicSession msg = consumer.receive0(0L);
}
-
MessageListener listener = consumer._messageListener;
MessageImpl message = consumer.createJMSMessage(msg, recoveredMessage);
@@ -847,7 +889,28 @@ public class SessionImpl implements Session, QueueSession, TopicSession }
}
+ Iterator<MessageConsumerImpl> consumers = _consumers.iterator();
+ while(consumers.hasNext())
+ {
+ MessageConsumerImpl consumer = consumers.next();
+ try
+ {
+ consumer.checkReceiverError();
+ }
+ catch (JMSException e)
+ {
+ consumers.remove();
+ try
+ {
+ _connection.getExceptionListener().onException(e);
+ consumer.close();
+ }
+ catch (JMSException e1)
+ {
+ }
+ }
+ }
}
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java index 76608c421b..2c48a6b20f 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java @@ -34,6 +34,7 @@ public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue private SessionImpl _session;
private final Set<MessageConsumer> _consumers =
Collections.synchronizedSet(new HashSet<MessageConsumer>());
+ private boolean _deleted;
protected TemporaryQueueImpl(String address, Sender sender, SessionImpl session)
{
@@ -56,7 +57,8 @@ public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue {
if(_consumers.isEmpty())
{
- close();
+ close();
+ _deleted = true;
}
else
{
@@ -100,6 +102,6 @@ public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue public boolean isDeleted()
{
- return _sender == null;
+ return _deleted;
}
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java index 8e0d07e78b..3ac70a29f2 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java @@ -34,6 +34,7 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic private SessionImpl _session;
private final Set<MessageConsumer> _consumers =
Collections.synchronizedSet(new HashSet<MessageConsumer>());
+ private boolean _deleted;
protected TemporaryTopicImpl(String address, Sender sender, SessionImpl session)
{
@@ -57,6 +58,7 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic {
if(_consumers.isEmpty())
{
+ _deleted = true;
close();
}
else
@@ -105,6 +107,6 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic public boolean isDeleted()
{
- return _sender == null;
+ return _deleted;
}
}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java index c3193f9fea..5e77b7097c 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java @@ -173,7 +173,14 @@ public class Demo extends Util Section[] sections = { properties, appProperties, amqpValue};
final Message message1 = new Message(Arrays.asList(sections));
- s.send(message1);
+ try
+ {
+ s.send(message1);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
Map<Object, Sender> sendingLinks = new HashMap<Object, Sender>();
Map<Object, Receiver> receivingLinks = new HashMap<Object, Receiver>();
@@ -295,7 +302,14 @@ public class Demo extends Util m2propmap.put(VENDOR, vendor);
ApplicationProperties m2appProps = new ApplicationProperties(m2propmap);
Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId)));
- sender.send(m2);
+ try
+ {
+ sender.send(m2);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
Map m3propmap = new HashMap();
m3propmap.put(OPCODE, LOG);
@@ -307,8 +321,14 @@ public class Demo extends Util Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
new AmqpValue("AMQP-"+messageId)));
- s.send(m3);
-
+ try
+ {
+ s.send(m3);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
responseReceiver.acknowledge(m);
@@ -336,7 +356,14 @@ public class Demo extends Util Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
new AmqpValue("AMQP-"+mp.getMessageId())));
- s.send(m3);
+ try
+ {
+ s.send(m3);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
entry.getValue().acknowledge(m);
}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java index 998d68cfa6..06440b8f19 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java @@ -121,14 +121,7 @@ public class Dump extends Util session.close(); conn.close(); - } catch (ConnectionException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (Sender.SenderClosingException e) + } catch (Exception e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java index 37550ea52f..c7bcd99312 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java @@ -248,27 +248,10 @@ public class Filesender extends Util session.close(); conn.close(); } - catch (ConnectionException e) + catch (Exception e) { e.printStackTrace(); } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); - } catch (FileNotFoundException e) - { - e.printStackTrace(); - } catch (IOException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (NoSuchAlgorithmException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } private Message createMessageFromFile(MessageDigest md5, String fileName, File file) throws IOException diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java index d8da58dc76..dbe273182f 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java @@ -216,23 +216,10 @@ public class Request extends Util conn2.close();
}
}
- catch (ConnectionException e)
+ catch (Exception e)
{
e.printStackTrace(); //TODO.
}
- catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (AmqpErrorException e)
- {
- e.printStackTrace(); //TODO.
- }
-
}
protected boolean hasSingleLinkPerConnectionMode()
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java index 6b1b70476e..1e4bcfc7d7 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java @@ -274,21 +274,13 @@ public class Respond extends Util _conn.close(); System.out.println("Received: " + receivedCount); } - catch (ConnectionException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderCreationException e) + catch (Exception e) { e.printStackTrace(); //TODO. } } - private void respond(Message m) throws Sender.SenderCreationException, ConnectionClosedException + private void respond(Message m) throws Sender.SenderCreationException, ConnectionClosedException, LinkDetachedException { List<Section> sections = m.getPayload(); String replyTo = null; diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java index ef1a31005c..b4ae16ab3f 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java @@ -219,19 +219,10 @@ public class Send extends Util session.close();
conn.close();
}
- catch (Sender.SenderClosingException e)
+ catch (Exception e)
{
e.printStackTrace(); //TODO.
}
- catch (ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //TODO.
- }
-
}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java new file mode 100644 index 0000000000..45b00255f2 --- /dev/null +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.transport.Error; + +public class LinkDetachedException extends Exception +{ + private final org.apache.qpid.amqp_1_0.type.transport.Error _remoteError; + + public LinkDetachedException(Error remoteError) + { + super(); + _remoteError = remoteError; + } + + public org.apache.qpid.amqp_1_0.type.transport.Error getRemoteError() + { + return _remoteError; + } + +} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index 6996171707..596931088f 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -50,6 +50,7 @@ public class Receiver implements DeliveryStateHandler private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>();
private MessageArrivalListener _messageArrivalListener;
private org.apache.qpid.amqp_1_0.type.transport.Error _error;
+ private Runnable _remoteErrorTask;
public Receiver(final Session session,
final String linkName,
@@ -125,6 +126,10 @@ public class Receiver implements DeliveryStateHandler public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
{
_error = detach.getError();
+ if(detach.getError()!=null)
+ {
+ remoteError();
+ }
super.remoteDetached(endpoint, detach);
}
});
@@ -171,6 +176,14 @@ public class Receiver implements DeliveryStateHandler }
}
+ private void remoteError()
+ {
+ if(_remoteErrorTask != null)
+ {
+ _remoteErrorTask.run();
+ }
+ }
+
private void postPrefetchAction()
{
if(_messageArrivalListener != null)
@@ -566,6 +579,11 @@ public class Receiver implements DeliveryStateHandler synchronized(_endpoint.getLock())
{
_messageArrivalListener = messageArrivalListener;
+ int prefetchSize = _prefetchQueue.size();
+ for(int i = 0; i < prefetchSize; i++)
+ {
+ postPrefetchAction();
+ }
}
}
@@ -590,4 +608,8 @@ public class Receiver implements DeliveryStateHandler void messageArrived(Receiver receiver);
}
+ public void setRemoteErrorListener(Runnable listener)
+ {
+ _remoteErrorTask = listener;
+ }
}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java index cf9f44af75..0feaa48805 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java @@ -22,7 +22,9 @@ package org.apache.qpid.amqp_1_0.client; import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
import org.apache.qpid.amqp_1_0.type.*;
import org.apache.qpid.amqp_1_0.type.Source;
import org.apache.qpid.amqp_1_0.type.Target;
@@ -35,6 +37,7 @@ import java.util.Collections; import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
public class Sender implements DeliveryStateHandler
{
@@ -44,6 +47,8 @@ public class Sender implements DeliveryStateHandler private int _windowSize;
private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
private boolean _closed;
+ private Error _error;
+ private Runnable _remoteErrorTask;
public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr)
throws SenderCreationException, ConnectionClosedException
@@ -166,6 +171,21 @@ public class Sender implements DeliveryStateHandler throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress());
};
}
+
+ _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener()
+ {
+
+ @Override
+ public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+ {
+ _error = detach.getError();
+ if(_error != null)
+ {
+ remoteError();
+ }
+ super.remoteDetached(endpoint, detach);
+ }
+ });
}
public Source getSource()
@@ -178,22 +198,22 @@ public class Sender implements DeliveryStateHandler return _endpoint.getTarget();
}
- public void send(Message message)
+ public void send(Message message) throws LinkDetachedException
{
send(message, null, null);
}
- public void send(Message message, final OutcomeAction action)
+ public void send(Message message, final OutcomeAction action) throws LinkDetachedException
{
send(message, null, action);
}
- public void send(Message message, final Transaction txn)
+ public void send(Message message, final Transaction txn) throws LinkDetachedException
{
send(message, txn, null);
}
- public void send(Message message, final Transaction txn, OutcomeAction action)
+ public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException
{
List<Section> sections = message.getPayload();
@@ -245,7 +265,7 @@ public class Sender implements DeliveryStateHandler final Object lock = _endpoint.getLock();
synchronized(lock)
{
- while(!_endpoint.hasCreditToSend())
+ while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
{
try
{
@@ -256,6 +276,10 @@ public class Sender implements DeliveryStateHandler e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
+ if(_endpoint.isDetached())
+ {
+ throw new LinkDetachedException(_error);
+ }
if(action != null)
{
_outcomeActions.put(message.getDeliveryTag(), action);
@@ -352,6 +376,21 @@ public class Sender implements DeliveryStateHandler _endpoint.updateDisposition(deliveryTag, state, true);
}
}
+ else if(state instanceof TransactionalState)
+ {
+ OutcomeAction action;
+
+ if((action = _outcomeActions.remove(deliveryTag)) != null)
+ {
+ action.onOutcome(deliveryTag, ((TransactionalState) state).getOutcome());
+ }
+
+ }
+ }
+
+ public SendingLinkEndpoint getEndpoint()
+ {
+ return _endpoint;
}
public Map<Binary, DeliveryState> getRemoteUnsettled()
@@ -364,6 +403,26 @@ public class Sender implements DeliveryStateHandler return _session;
}
+
+ private void remoteError()
+ {
+ if(_remoteErrorTask != null)
+ {
+ _remoteErrorTask.run();
+ }
+ }
+
+
+ public void setRemoteErrorListener(Runnable listener)
+ {
+ _remoteErrorTask = listener;
+ }
+
+ public Error getError()
+ {
+ return _error;
+ }
+
public class SenderCreationException extends Exception
{
public SenderCreationException(Throwable e)
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java index 73c4d4bd95..8e09e093e0 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java @@ -31,7 +31,6 @@ public abstract class CompoundWriter<V> implements ValueWriter<V> private Registry _registry; private static final int LARGE_COMPOUND_THRESHOLD_COUNT = 25; private ValueWriter _delegate; - private Map<Class, ValueWriter> _writerCache = new HashMap<Class, ValueWriter>(); public CompoundWriter(final Registry registry) { @@ -305,7 +304,7 @@ public abstract class CompoundWriter<V> implements ValueWriter<V> for(int i = 0; i < getCount(); i++) { Object val = next(); - ValueWriter writer = _registry.getValueWriter(val, _writerCache); + ValueWriter writer = _registry.getValueWriter(val); if(writer == null) { // TODO diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedLongWriter.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedLongWriter.java index 93e0fea740..6b77620e5e 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedLongWriter.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedLongWriter.java @@ -127,7 +127,7 @@ public class UnsignedLongWriter implements ValueWriter<UnsignedLong> { _delegate = _zeroByteWriter; } - else if(ulong.compareTo(UnsignedLong.valueOf(256))<0) + else if((ulong.longValue() & 0xffL) == ulong.longValue()) { _delegate = _oneByteWriter; } diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java index 4ee0fd1f70..82e6216c21 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java @@ -298,102 +298,71 @@ public class AMQPDescribedTypeRegistry implements DescribedTypeConstructorRegist private final Map<Class, ValueWriter.Factory> _writerMap = new HashMap<Class, ValueWriter.Factory>(); - private final Map<Class, ValueWriter> _cachedWriters = new HashMap<Class,ValueWriter>(); public <V extends Object> ValueWriter<V> getValueWriter(V value, Map<Class, ValueWriter> localCache) { - Class<? extends Object> clazz = value == null ? Void.TYPE : value.getClass(); - ValueWriter writer = null; // TODO localCache.get(clazz); - if(writer == null || !writer.isComplete()) - { - writer = getValueWriter(value); - localCache.put(clazz, writer); - } - else - { - writer.setValue(value); - } + return getValueWriter(value); - - return writer; } public <V extends Object> ValueWriter<V> getValueWriter(V value) { + ValueWriter writer; Class<? extends Object> clazz = value == null ? Void.TYPE : value.getClass(); - ValueWriter writer = null; // TODO _cachedWriters.get(clazz); - if(writer == null || !writer.isComplete()) + ValueWriter.Factory<V> factory = (ValueWriter.Factory<V>) (_writerMap.get(clazz)); + + if(factory == null) { - ValueWriter.Factory<V> factory = (ValueWriter.Factory<V>) (_writerMap.get(clazz)); + if(value instanceof List) + { + factory = _writerMap.get(List.class); + _writerMap.put(value.getClass(), factory); + writer = factory.newInstance(this); + writer.setValue(value); - if(factory == null) + } + else if(value instanceof Map) { - if(value instanceof List) - { - factory = _writerMap.get(List.class); - _writerMap.put(value.getClass(), factory); - writer = factory.newInstance(this); - if(writer.isCacheable()) - { - _cachedWriters.put(clazz, writer); - } - writer.setValue(value); + factory = _writerMap.get(Map.class); + _writerMap.put(value.getClass(), factory); + writer = factory.newInstance(this); + writer.setValue(value); - } - else if(value instanceof Map) + } + else if(value.getClass().isArray()) + { + if(RestrictedType.class.isAssignableFrom(value.getClass().getComponentType())) { - factory = _writerMap.get(Map.class); - _writerMap.put(value.getClass(), factory); - writer = factory.newInstance(this); - if(writer.isCacheable()) + RestrictedType[] restrictedTypes = (RestrictedType[]) value; + Object[] newVals = (Object[]) Array.newInstance(restrictedTypes[0].getValue().getClass(), + restrictedTypes.length); + for(int i = 0; i < restrictedTypes.length; i++) { - _cachedWriters.put(clazz, writer); + newVals[i] = restrictedTypes[i].getValue(); } - writer.setValue(value); - + return (ValueWriter<V>) getValueWriter(newVals); } - else if(value.getClass().isArray()) - { - if(RestrictedType.class.isAssignableFrom(value.getClass().getComponentType())) - { - RestrictedType[] restrictedTypes = (RestrictedType[]) value; - Object[] newVals = (Object[]) Array.newInstance(restrictedTypes[0].getValue().getClass(), - restrictedTypes.length); - for(int i = 0; i < restrictedTypes.length; i++) - { - newVals[i] = restrictedTypes[i].getValue(); - } - return (ValueWriter<V>) getValueWriter(newVals); - } - // TODO primitive array types - factory = _writerMap.get(List.class); - writer = factory.newInstance(this); - writer.setValue(Arrays.asList((Object[])value)); + // TODO primitive array types + factory = _writerMap.get(List.class); + writer = factory.newInstance(this); + writer.setValue(Arrays.asList((Object[])value)); - } - else - { - return null; - } } else { - writer = factory.newInstance(this); - if(writer.isCacheable()) - { - _cachedWriters.put(clazz, writer); - } - writer.setValue(value); + return null; } } else { + writer = factory.newInstance(this); writer.setValue(value); } + return writer; } |