From 9d5f78ba71ce4503758f97f0eb5c659850a621ae Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 15 May 2013 13:15:04 +0000 Subject: Merge fixes for JMS AMQP 1-0 client from trunk to 0.22 release git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.22@1482809 13f79535-47bb-0310-9956-ffa450edef68 --- .../amqp_1_0/jms/MessageRejectedException.java | 29 +++++ .../amqp_1_0/jms/impl/ConnectionFactoryImpl.java | 52 ++++++--- .../qpid/amqp_1_0/jms/impl/ConnectionImpl.java | 34 ++++-- .../amqp_1_0/jms/impl/MessageConsumerImpl.java | 60 +++++++++-- .../amqp_1_0/jms/impl/MessageProducerImpl.java | 117 ++++++++++++++++++++- .../apache/qpid/amqp_1_0/jms/impl/SessionImpl.java | 69 +++++++++++- .../qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java | 6 +- .../qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java | 4 +- .../java/org/apache/qpid/amqp_1_0/client/Demo.java | 37 ++++++- .../java/org/apache/qpid/amqp_1_0/client/Dump.java | 9 +- .../apache/qpid/amqp_1_0/client/Filesender.java | 19 +--- .../org/apache/qpid/amqp_1_0/client/Request.java | 15 +-- .../org/apache/qpid/amqp_1_0/client/Respond.java | 12 +-- .../java/org/apache/qpid/amqp_1_0/client/Send.java | 11 +- .../amqp_1_0/client/LinkDetachedException.java | 40 +++++++ .../org/apache/qpid/amqp_1_0/client/Receiver.java | 22 ++++ .../org/apache/qpid/amqp_1_0/client/Sender.java | 69 +++++++++++- .../apache/qpid/amqp_1_0/codec/CompoundWriter.java | 3 +- .../qpid/amqp_1_0/codec/UnsignedLongWriter.java | 2 +- .../type/codec/AMQPDescribedTypeRegistry.java | 99 ++++++----------- 20 files changed, 528 insertions(+), 181 deletions(-) create mode 100644 qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java create mode 100644 qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java new file mode 100644 index 0000000000..bc2b6349c8 --- /dev/null +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.amqp_1_0.jms; + +import javax.jms.JMSException; + +public class MessageRejectedException extends JMSException +{ + public MessageRejectedException(String s) + { + super(s); + } +} diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java index 4856a7c491..96788ecd27 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java @@ -45,6 +45,7 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection private String _queuePrefix; private String _topicPrefix; + private boolean _useBinaryMessageId; public ConnectionFactoryImpl(final String host, final int port, @@ -100,6 +101,7 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl); connection.setQueuePrefix(_queuePrefix); connection.setTopicPrefix(_topicPrefix); + connection.setUseBinaryMessageId(_useBinaryMessageId); return connection; } @@ -149,6 +151,9 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection String password = null; String clientId = null; String remoteHost = null; + + boolean binaryMessageId = true; + if(userInfo != null) { String[] components = userInfo.split(":",2); @@ -161,22 +166,26 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection String query = url.getQuery(); if(query != null) { - for(String param : query.split("&")) - { - String[] keyValuePair = param.split("=",2); - if(keyValuePair[0].equalsIgnoreCase("clientid")) - { - clientId = keyValuePair[1]; - } - else if(keyValuePair[0].equalsIgnoreCase("ssl")) - { - ssl = Boolean.valueOf(keyValuePair[1]); - } - else if(keyValuePair[0].equalsIgnoreCase("remote-host")) - { - remoteHost = keyValuePair[1]; - } - } + for(String param : query.split("&")) + { + String[] keyValuePair = param.split("=",2); + if(keyValuePair[0].equalsIgnoreCase("clientid")) + { + clientId = keyValuePair[1]; + } + else if(keyValuePair[0].equalsIgnoreCase("ssl")) + { + ssl = Boolean.valueOf(keyValuePair[1]); + } + else if(keyValuePair[0].equalsIgnoreCase("remote-host")) + { + remoteHost = keyValuePair[1]; + } + else if (keyValuePair[0].equalsIgnoreCase("binary-messageid")) + { + binaryMessageId = Boolean.parseBoolean(keyValuePair[1]); + } + } } if(remoteHost == null) @@ -184,7 +193,11 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection remoteHost = host; } - return new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl); + ConnectionFactoryImpl connectionFactory = + new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl); + connectionFactory.setUseBinaryMessageId(binaryMessageId); + + return connectionFactory; } @@ -235,4 +248,9 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection { _queuePrefix = queuePrefix; } + + public void setUseBinaryMessageId(boolean useBinaryMessageId) + { + _useBinaryMessageId = useBinaryMessageId; + } } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java index 73701889b5..0ad62fd730 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java @@ -56,7 +56,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect private String _clientId; private String _queuePrefix; private String _topicPrefix; - + private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true")); private static enum State { @@ -163,6 +163,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect connect(); started = true; } + try { SessionImpl session = new SessionImpl(this, acknowledgeMode); @@ -170,6 +171,11 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect session.setTopicSession(_isTopicConnection); _sessions.add(session); + if(_state == State.STARTED) + { + session.start(); + } + return session; } catch(JMSException e) @@ -191,9 +197,17 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect throw e; } } - } + + } + + void removeSession(SessionImpl session) + { + synchronized (_lock) + { + _sessions.remove(session); + } } private void reconnect(String networkHost, int port, String hostName) @@ -410,10 +424,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect public boolean isStarted() { - synchronized (_lock) - { - return _state == State.STARTED; - } + return _state == State.STARTED; } void setQueueConnection(final boolean queueConnection) @@ -499,4 +510,15 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect return new DecodedDestination(address, kind); } + void setUseBinaryMessageId(boolean useBinaryMessageId) + { + _useBinaryMessageId = useBinaryMessageId; + } + + boolean useBinaryMessageId() + { + return _useBinaryMessageId; + } + + } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index def5ae5931..d2b34e0f13 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import javax.jms.Destination; +import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; import javax.jms.InvalidDestinationException; import javax.jms.InvalidSelectorException; @@ -117,6 +118,29 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi _session = session; _receiver = createClientReceiver(); + _receiver.setRemoteErrorListener(new Runnable() + { + @Override + public void run() + { + try + { + final ExceptionListener exceptionListener = _session.getConnection().getExceptionListener(); + + if(exceptionListener != null) + { + final Error receiverError = _receiver.getError(); + exceptionListener.onException(new JMSException(receiverError.getDescription(), + receiverError.getCondition().getValue().toString())); + + } + } + catch (JMSException e) + { + + } + } + }); } @@ -125,8 +149,8 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi { try { - return _session.getClientSession(). createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO, - _linkName, _durable, getFilters(), null); + return _session.getClientSession().createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO, + _linkName, _durable, getFilters(), null); } catch (ConnectionErrorException e) { @@ -188,15 +212,16 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi { checkClosed(); _messageListener = messageListener; - _session.messageListenerSet( this ); _receiver.setMessageArrivalListener(new Receiver.MessageArrivalListener() - { + { + + public void messageArrived(final Receiver receiver) + { + _session.messageArrived(MessageConsumerImpl.this); + } + }); + _session.messageListenerSet( this ); - public void messageArrived(final Receiver receiver) - { - _session.messageArrived(MessageConsumerImpl.this); - } - }); } public MessageImpl receive() throws JMSException @@ -219,12 +244,14 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi return receiveImpl(0L); } - private MessageImpl receiveImpl(long timeout) throws IllegalStateException + private MessageImpl receiveImpl(long timeout) throws JMSException { + org.apache.qpid.amqp_1_0.client.Message msg; boolean redelivery; if(_replaymessages.isEmpty()) { + checkReceiverError(); msg = receive0(timeout); redelivery = false; } @@ -241,8 +268,21 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi return createJMSMessage(msg, redelivery); } + void checkReceiverError() throws JMSException + { + final Error receiverError = _receiver.getError(); + if(receiverError != null) + { + JMSException jmsException = + new JMSException(receiverError.getDescription(), receiverError.getCondition().toString()); + + throw jmsException; + } + } + Message receive0(final long timeout) { + Message message = _receiver.receive(timeout); if(_session.getAckModeEnum() == Session.AcknowledgeMode.CLIENT_ACKNOWLEDGE) { diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java index 77544e4112..79c1606edb 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java @@ -19,17 +19,22 @@ package org.apache.qpid.amqp_1_0.jms.impl; import org.apache.qpid.amqp_1_0.client.ConnectionClosedException; +import org.apache.qpid.amqp_1_0.client.LinkDetachedException; import org.apache.qpid.amqp_1_0.client.Sender; import org.apache.qpid.amqp_1_0.jms.MessageProducer; +import org.apache.qpid.amqp_1_0.jms.MessageRejectedException; import org.apache.qpid.amqp_1_0.jms.QueueSender; import org.apache.qpid.amqp_1_0.jms.TemporaryDestination; import org.apache.qpid.amqp_1_0.jms.TopicPublisher; import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.Outcome; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; import javax.jms.*; import javax.jms.IllegalStateException; import java.util.UUID; +import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.amqp_1_0.type.transport.*; public class MessageProducerImpl implements MessageProducer, QueueSender, TopicPublisher { @@ -43,6 +48,8 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP private SessionImpl _session; private Sender _sender; private boolean _closed; + private boolean _syncPublish = Boolean.getBoolean("qpid.sync_publish"); + private long _syncPublishTimeout = Long.getLong("qpid.sync_publish_timeout", 30000l); protected MessageProducerImpl(final Destination destination, final SessionImpl session) throws JMSException @@ -81,6 +88,29 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP jmsEx.setLinkedException(e); throw jmsEx; } + _sender.setRemoteErrorListener(new Runnable() + { + @Override + public void run() + { + try + { + final ExceptionListener exceptionListener = _session.getConnection().getExceptionListener(); + + if(exceptionListener != null) + { + final org.apache.qpid.amqp_1_0.type.transport.Error receiverError = _sender.getError(); + exceptionListener.onException(new JMSException(receiverError.getDescription(), + receiverError.getCondition().getValue().toString())); + + } + } + catch (JMSException e) + { + + } + } + }); } } @@ -234,7 +264,7 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP if(!getDisableMessageID() && msg.getMessageId() == null) { - final Binary messageId = generateMessageId(); + final Object messageId = generateMessageId(); msg.setMessageId(messageId); } @@ -251,7 +281,28 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP final org.apache.qpid.amqp_1_0.client.Message clientMessage = new org.apache.qpid.amqp_1_0.client.Message(msg.getSections()); - _sender.send(clientMessage, _session.getTxn()); + DispositionAction action = null; + + if(_syncPublish) + { + action = new DispositionAction(_sender); + } + + try + { + _sender.send(clientMessage, _session.getTxn(), action); + } + catch (LinkDetachedException e) + { + JMSException jmsException = new InvalidDestinationException("Sender has been closed"); + jmsException.setLinkedException(e); + throw jmsException; + } + + if(_syncPublish && !action.wasAccepted(_syncPublishTimeout + System.currentTimeMillis())) + { + throw new MessageRejectedException("Message was rejected"); + } if(getDestination() != null) { @@ -270,10 +321,11 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP send((Destination)queue, message, deliveryMode, priority, ttl); } - private Binary generateMessageId() + private Object generateMessageId() { UUID uuid = UUID.randomUUID(); - return new Binary(uuid.toString().getBytes()); + final String messageIdString = uuid.toString(); + return _session.getConnection().useBinaryMessageId() ? new Binary(messageIdString.getBytes()) : messageIdString; } public void send(final Destination destination, final Message message) throws JMSException @@ -377,4 +429,61 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP { send(topic, message, deliveryMode, priority, ttl); } + + private static class DispositionAction implements Sender.OutcomeAction + { + private final Sender _sender; + private final Object _lock; + private Outcome _outcome; + + public DispositionAction(Sender sender) + { + _sender = sender; + _lock = sender.getEndpoint().getLock(); + } + + @Override + public void onOutcome(Binary deliveryTag, Outcome outcome) + { + synchronized (_lock) + { + _outcome = outcome; + _lock.notifyAll(); + } + } + + public boolean wasAccepted(long timeout) throws JMSException + { + synchronized(_lock) + { + while(_outcome == null && !_sender.getEndpoint().isDetached()) + { + try + { + _lock.wait(timeout - System.currentTimeMillis()); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + } + if(_outcome == null) + { + + if(_sender.getEndpoint().isDetached()) + { + throw new JMSException("Link was detached"); + } + else + { + throw new JMSException("Timed out waiting for message acceptance"); + } + } + else + { + return _outcome instanceof Accepted; + } + } + } + } } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index ba487cc3f6..2ae67913fe 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -21,10 +21,12 @@ package org.apache.qpid.amqp_1_0.jms.impl; import java.io.Serializable; import java.util.ArrayList; import java.util.Enumeration; +import java.util.Iterator; import java.util.List; import java.util.UUID; import javax.jms.BytesMessage; import javax.jms.Destination; +import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; import javax.jms.InvalidDestinationException; import javax.jms.JMSException; @@ -52,9 +54,11 @@ import org.apache.qpid.amqp_1_0.jms.TemporaryDestination; import org.apache.qpid.amqp_1_0.jms.TopicPublisher; import org.apache.qpid.amqp_1_0.jms.TopicSession; import org.apache.qpid.amqp_1_0.jms.TopicSubscriber; +import org.apache.qpid.amqp_1_0.transport.SessionEventListener; import org.apache.qpid.amqp_1_0.type.messaging.Source; import org.apache.qpid.amqp_1_0.type.messaging.Target; -import org.apache.qpid.amqp_1_0.type.transport.AmqpError; +import org.apache.qpid.amqp_1_0.type.transport.*; +import org.apache.qpid.amqp_1_0.type.transport.Error; public class SessionImpl implements Session, QueueSession, TopicSession { @@ -90,6 +94,45 @@ public class SessionImpl implements Session, QueueSession, TopicSession jmsException.setLinkedException(e); throw jmsException; } + _session.getEndpoint().setSessionEventListener(new SessionEventListener.DefaultSessionEventListener() + { + @Override + public void remoteEnd(End end) + { + if(!_closed) + { + try + { + close(); + } + catch (JMSException e) + { + } + try + { + final Error error = end.getError(); + final ExceptionListener exceptionListener = _connection.getExceptionListener(); + if(exceptionListener != null) + { + if(error != null) + { + exceptionListener.onException(new JMSException(error.getDescription(), + error.getCondition().getValue().toString())); + } + else + { + exceptionListener.onException(new JMSException("Session remotely closed")); + } + } + } + catch (JMSException e) + { + + } + + } + } + }); if(_acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED) { _txn = _session.createSessionLocalTransaction(); @@ -230,6 +273,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession producer.close(); } _session.close(); + _connection.removeSession(this); } } @@ -765,7 +809,6 @@ public class SessionImpl implements Session, QueueSession, TopicSession return _txn; } - private class Dispatcher implements Runnable { @@ -816,7 +859,6 @@ public class SessionImpl implements Session, QueueSession, TopicSession msg = consumer.receive0(0L); } - MessageListener listener = consumer._messageListener; MessageImpl message = consumer.createJMSMessage(msg, recoveredMessage); @@ -847,7 +889,28 @@ public class SessionImpl implements Session, QueueSession, TopicSession } } + Iterator consumers = _consumers.iterator(); + while(consumers.hasNext()) + { + MessageConsumerImpl consumer = consumers.next(); + try + { + consumer.checkReceiverError(); + } + catch (JMSException e) + { + consumers.remove(); + try + { + _connection.getExceptionListener().onException(e); + consumer.close(); + } + catch (JMSException e1) + { + } + } + } } } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java index 76608c421b..2c48a6b20f 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java @@ -34,6 +34,7 @@ public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue private SessionImpl _session; private final Set _consumers = Collections.synchronizedSet(new HashSet()); + private boolean _deleted; protected TemporaryQueueImpl(String address, Sender sender, SessionImpl session) { @@ -56,7 +57,8 @@ public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue { if(_consumers.isEmpty()) { - close(); + close(); + _deleted = true; } else { @@ -100,6 +102,6 @@ public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue public boolean isDeleted() { - return _sender == null; + return _deleted; } } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java index 8e0d07e78b..3ac70a29f2 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java @@ -34,6 +34,7 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic private SessionImpl _session; private final Set _consumers = Collections.synchronizedSet(new HashSet()); + private boolean _deleted; protected TemporaryTopicImpl(String address, Sender sender, SessionImpl session) { @@ -57,6 +58,7 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic { if(_consumers.isEmpty()) { + _deleted = true; close(); } else @@ -105,6 +107,6 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic public boolean isDeleted() { - return _sender == null; + return _deleted; } } diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java index c3193f9fea..5e77b7097c 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java @@ -173,7 +173,14 @@ public class Demo extends Util Section[] sections = { properties, appProperties, amqpValue}; final Message message1 = new Message(Arrays.asList(sections)); - s.send(message1); + try + { + s.send(message1); + } + catch (Exception e) + { + throw new RuntimeException(e); + } Map sendingLinks = new HashMap(); Map receivingLinks = new HashMap(); @@ -295,7 +302,14 @@ public class Demo extends Util m2propmap.put(VENDOR, vendor); ApplicationProperties m2appProps = new ApplicationProperties(m2propmap); Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId))); - sender.send(m2); + try + { + sender.send(m2); + } + catch (Exception e) + { + throw new RuntimeException(e); + } Map m3propmap = new HashMap(); m3propmap.put(OPCODE, LOG); @@ -307,8 +321,14 @@ public class Demo extends Util Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), new AmqpValue("AMQP-"+messageId))); - s.send(m3); - + try + { + s.send(m3); + } + catch (Exception e) + { + throw new RuntimeException(e); + } } responseReceiver.acknowledge(m); @@ -336,7 +356,14 @@ public class Demo extends Util Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), new AmqpValue("AMQP-"+mp.getMessageId()))); - s.send(m3); + try + { + s.send(m3); + } + catch (Exception e) + { + throw new RuntimeException(e); + } entry.getValue().acknowledge(m); } diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java index 998d68cfa6..06440b8f19 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java @@ -121,14 +121,7 @@ public class Dump extends Util session.close(); conn.close(); - } catch (ConnectionException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (Sender.SenderClosingException e) + } catch (Exception e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java index 37550ea52f..c7bcd99312 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java @@ -248,27 +248,10 @@ public class Filesender extends Util session.close(); conn.close(); } - catch (ConnectionException e) + catch (Exception e) { e.printStackTrace(); } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); - } catch (FileNotFoundException e) - { - e.printStackTrace(); - } catch (IOException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (NoSuchAlgorithmException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } private Message createMessageFromFile(MessageDigest md5, String fileName, File file) throws IOException diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java index d8da58dc76..dbe273182f 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java @@ -216,23 +216,10 @@ public class Request extends Util conn2.close(); } } - catch (ConnectionException e) + catch (Exception e) { e.printStackTrace(); //TODO. } - catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //TODO. - } - catch (AmqpErrorException e) - { - e.printStackTrace(); //TODO. - } - } protected boolean hasSingleLinkPerConnectionMode() diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java index 6b1b70476e..1e4bcfc7d7 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java @@ -274,21 +274,13 @@ public class Respond extends Util _conn.close(); System.out.println("Received: " + receivedCount); } - catch (ConnectionException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderCreationException e) + catch (Exception e) { e.printStackTrace(); //TODO. } } - private void respond(Message m) throws Sender.SenderCreationException, ConnectionClosedException + private void respond(Message m) throws Sender.SenderCreationException, ConnectionClosedException, LinkDetachedException { List
sections = m.getPayload(); String replyTo = null; diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java index ef1a31005c..b4ae16ab3f 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java @@ -219,19 +219,10 @@ public class Send extends Util session.close(); conn.close(); } - catch (Sender.SenderClosingException e) + catch (Exception e) { e.printStackTrace(); //TODO. } - catch (ConnectionException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //TODO. - } - } diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java new file mode 100644 index 0000000000..45b00255f2 --- /dev/null +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.transport.Error; + +public class LinkDetachedException extends Exception +{ + private final org.apache.qpid.amqp_1_0.type.transport.Error _remoteError; + + public LinkDetachedException(Error remoteError) + { + super(); + _remoteError = remoteError; + } + + public org.apache.qpid.amqp_1_0.type.transport.Error getRemoteError() + { + return _remoteError; + } + +} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index 6996171707..596931088f 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -50,6 +50,7 @@ public class Receiver implements DeliveryStateHandler private Map _unsettledMap = new HashMap(); private MessageArrivalListener _messageArrivalListener; private org.apache.qpid.amqp_1_0.type.transport.Error _error; + private Runnable _remoteErrorTask; public Receiver(final Session session, final String linkName, @@ -125,6 +126,10 @@ public class Receiver implements DeliveryStateHandler public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) { _error = detach.getError(); + if(detach.getError()!=null) + { + remoteError(); + } super.remoteDetached(endpoint, detach); } }); @@ -171,6 +176,14 @@ public class Receiver implements DeliveryStateHandler } } + private void remoteError() + { + if(_remoteErrorTask != null) + { + _remoteErrorTask.run(); + } + } + private void postPrefetchAction() { if(_messageArrivalListener != null) @@ -566,6 +579,11 @@ public class Receiver implements DeliveryStateHandler synchronized(_endpoint.getLock()) { _messageArrivalListener = messageArrivalListener; + int prefetchSize = _prefetchQueue.size(); + for(int i = 0; i < prefetchSize; i++) + { + postPrefetchAction(); + } } } @@ -590,4 +608,8 @@ public class Receiver implements DeliveryStateHandler void messageArrived(Receiver receiver); } + public void setRemoteErrorListener(Runnable listener) + { + _remoteErrorTask = listener; + } } diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java index cf9f44af75..0feaa48805 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java @@ -22,7 +22,9 @@ package org.apache.qpid.amqp_1_0.client; import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; +import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SendingLinkListener; import org.apache.qpid.amqp_1_0.type.*; import org.apache.qpid.amqp_1_0.type.Source; import org.apache.qpid.amqp_1_0.type.Target; @@ -35,6 +37,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.qpid.amqp_1_0.type.transport.Error; public class Sender implements DeliveryStateHandler { @@ -44,6 +47,8 @@ public class Sender implements DeliveryStateHandler private int _windowSize; private Map _outcomeActions = Collections.synchronizedMap(new HashMap()); private boolean _closed; + private Error _error; + private Runnable _remoteErrorTask; public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr) throws SenderCreationException, ConnectionClosedException @@ -166,6 +171,21 @@ public class Sender implements DeliveryStateHandler throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress()); }; } + + _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener() + { + + @Override + public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) + { + _error = detach.getError(); + if(_error != null) + { + remoteError(); + } + super.remoteDetached(endpoint, detach); + } + }); } public Source getSource() @@ -178,22 +198,22 @@ public class Sender implements DeliveryStateHandler return _endpoint.getTarget(); } - public void send(Message message) + public void send(Message message) throws LinkDetachedException { send(message, null, null); } - public void send(Message message, final OutcomeAction action) + public void send(Message message, final OutcomeAction action) throws LinkDetachedException { send(message, null, action); } - public void send(Message message, final Transaction txn) + public void send(Message message, final Transaction txn) throws LinkDetachedException { send(message, txn, null); } - public void send(Message message, final Transaction txn, OutcomeAction action) + public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException { List
sections = message.getPayload(); @@ -245,7 +265,7 @@ public class Sender implements DeliveryStateHandler final Object lock = _endpoint.getLock(); synchronized(lock) { - while(!_endpoint.hasCreditToSend()) + while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached()) { try { @@ -256,6 +276,10 @@ public class Sender implements DeliveryStateHandler e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } + if(_endpoint.isDetached()) + { + throw new LinkDetachedException(_error); + } if(action != null) { _outcomeActions.put(message.getDeliveryTag(), action); @@ -352,6 +376,21 @@ public class Sender implements DeliveryStateHandler _endpoint.updateDisposition(deliveryTag, state, true); } } + else if(state instanceof TransactionalState) + { + OutcomeAction action; + + if((action = _outcomeActions.remove(deliveryTag)) != null) + { + action.onOutcome(deliveryTag, ((TransactionalState) state).getOutcome()); + } + + } + } + + public SendingLinkEndpoint getEndpoint() + { + return _endpoint; } public Map getRemoteUnsettled() @@ -364,6 +403,26 @@ public class Sender implements DeliveryStateHandler return _session; } + + private void remoteError() + { + if(_remoteErrorTask != null) + { + _remoteErrorTask.run(); + } + } + + + public void setRemoteErrorListener(Runnable listener) + { + _remoteErrorTask = listener; + } + + public Error getError() + { + return _error; + } + public class SenderCreationException extends Exception { public SenderCreationException(Throwable e) diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java index 73c4d4bd95..8e09e093e0 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java @@ -31,7 +31,6 @@ public abstract class CompoundWriter implements ValueWriter private Registry _registry; private static final int LARGE_COMPOUND_THRESHOLD_COUNT = 25; private ValueWriter _delegate; - private Map _writerCache = new HashMap(); public CompoundWriter(final Registry registry) { @@ -305,7 +304,7 @@ public abstract class CompoundWriter implements ValueWriter for(int i = 0; i < getCount(); i++) { Object val = next(); - ValueWriter writer = _registry.getValueWriter(val, _writerCache); + ValueWriter writer = _registry.getValueWriter(val); if(writer == null) { // TODO diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedLongWriter.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedLongWriter.java index 93e0fea740..6b77620e5e 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedLongWriter.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedLongWriter.java @@ -127,7 +127,7 @@ public class UnsignedLongWriter implements ValueWriter { _delegate = _zeroByteWriter; } - else if(ulong.compareTo(UnsignedLong.valueOf(256))<0) + else if((ulong.longValue() & 0xffL) == ulong.longValue()) { _delegate = _oneByteWriter; } diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java index 4ee0fd1f70..82e6216c21 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java @@ -298,102 +298,71 @@ public class AMQPDescribedTypeRegistry implements DescribedTypeConstructorRegist private final Map _writerMap = new HashMap(); - private final Map _cachedWriters = new HashMap(); public ValueWriter getValueWriter(V value, Map localCache) { - Class clazz = value == null ? Void.TYPE : value.getClass(); - ValueWriter writer = null; // TODO localCache.get(clazz); - if(writer == null || !writer.isComplete()) - { - writer = getValueWriter(value); - localCache.put(clazz, writer); - } - else - { - writer.setValue(value); - } + return getValueWriter(value); - - return writer; } public ValueWriter getValueWriter(V value) { + ValueWriter writer; Class clazz = value == null ? Void.TYPE : value.getClass(); - ValueWriter writer = null; // TODO _cachedWriters.get(clazz); - if(writer == null || !writer.isComplete()) + ValueWriter.Factory factory = (ValueWriter.Factory) (_writerMap.get(clazz)); + + if(factory == null) { - ValueWriter.Factory factory = (ValueWriter.Factory) (_writerMap.get(clazz)); + if(value instanceof List) + { + factory = _writerMap.get(List.class); + _writerMap.put(value.getClass(), factory); + writer = factory.newInstance(this); + writer.setValue(value); - if(factory == null) + } + else if(value instanceof Map) { - if(value instanceof List) - { - factory = _writerMap.get(List.class); - _writerMap.put(value.getClass(), factory); - writer = factory.newInstance(this); - if(writer.isCacheable()) - { - _cachedWriters.put(clazz, writer); - } - writer.setValue(value); + factory = _writerMap.get(Map.class); + _writerMap.put(value.getClass(), factory); + writer = factory.newInstance(this); + writer.setValue(value); - } - else if(value instanceof Map) + } + else if(value.getClass().isArray()) + { + if(RestrictedType.class.isAssignableFrom(value.getClass().getComponentType())) { - factory = _writerMap.get(Map.class); - _writerMap.put(value.getClass(), factory); - writer = factory.newInstance(this); - if(writer.isCacheable()) + RestrictedType[] restrictedTypes = (RestrictedType[]) value; + Object[] newVals = (Object[]) Array.newInstance(restrictedTypes[0].getValue().getClass(), + restrictedTypes.length); + for(int i = 0; i < restrictedTypes.length; i++) { - _cachedWriters.put(clazz, writer); + newVals[i] = restrictedTypes[i].getValue(); } - writer.setValue(value); - + return (ValueWriter) getValueWriter(newVals); } - else if(value.getClass().isArray()) - { - if(RestrictedType.class.isAssignableFrom(value.getClass().getComponentType())) - { - RestrictedType[] restrictedTypes = (RestrictedType[]) value; - Object[] newVals = (Object[]) Array.newInstance(restrictedTypes[0].getValue().getClass(), - restrictedTypes.length); - for(int i = 0; i < restrictedTypes.length; i++) - { - newVals[i] = restrictedTypes[i].getValue(); - } - return (ValueWriter) getValueWriter(newVals); - } - // TODO primitive array types - factory = _writerMap.get(List.class); - writer = factory.newInstance(this); - writer.setValue(Arrays.asList((Object[])value)); + // TODO primitive array types + factory = _writerMap.get(List.class); + writer = factory.newInstance(this); + writer.setValue(Arrays.asList((Object[])value)); - } - else - { - return null; - } } else { - writer = factory.newInstance(this); - if(writer.isCacheable()) - { - _cachedWriters.put(clazz, writer); - } - writer.setValue(value); + return null; } } else { + writer = factory.newInstance(this); writer.setValue(value); } + return writer; } -- cgit v1.2.1