summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-05-15 13:15:04 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-05-15 13:15:04 +0000
commit9d5f78ba71ce4503758f97f0eb5c659850a621ae (patch)
treecd79603ba5da696e7d111c9182ee631f5cb5e000
parent62fa41c41ec453be13e4a8e5846139386c3c8c4c (diff)
downloadqpid-python-9d5f78ba71ce4503758f97f0eb5c659850a621ae.tar.gz
Merge fixes for JMS AMQP 1-0 client from trunk to 0.22 release
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.22@1482809 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java29
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java52
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java34
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java60
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java117
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java69
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java6
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java4
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java37
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java9
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java19
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java15
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java12
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java11
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java40
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java22
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java69
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java3
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedLongWriter.java2
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java99
20 files changed, 528 insertions, 181 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java
new file mode 100644
index 0000000000..bc2b6349c8
--- /dev/null
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms;
+
+import javax.jms.JMSException;
+
+public class MessageRejectedException extends JMSException
+{
+ public MessageRejectedException(String s)
+ {
+ super(s);
+ }
+}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
index 4856a7c491..96788ecd27 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
@@ -45,6 +45,7 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection
private String _queuePrefix;
private String _topicPrefix;
+ private boolean _useBinaryMessageId;
public ConnectionFactoryImpl(final String host,
final int port,
@@ -100,6 +101,7 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection
ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl);
connection.setQueuePrefix(_queuePrefix);
connection.setTopicPrefix(_topicPrefix);
+ connection.setUseBinaryMessageId(_useBinaryMessageId);
return connection;
}
@@ -149,6 +151,9 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection
String password = null;
String clientId = null;
String remoteHost = null;
+
+ boolean binaryMessageId = true;
+
if(userInfo != null)
{
String[] components = userInfo.split(":",2);
@@ -161,22 +166,26 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection
String query = url.getQuery();
if(query != null)
{
- for(String param : query.split("&"))
- {
- String[] keyValuePair = param.split("=",2);
- if(keyValuePair[0].equalsIgnoreCase("clientid"))
- {
- clientId = keyValuePair[1];
- }
- else if(keyValuePair[0].equalsIgnoreCase("ssl"))
- {
- ssl = Boolean.valueOf(keyValuePair[1]);
- }
- else if(keyValuePair[0].equalsIgnoreCase("remote-host"))
- {
- remoteHost = keyValuePair[1];
- }
- }
+ for(String param : query.split("&"))
+ {
+ String[] keyValuePair = param.split("=",2);
+ if(keyValuePair[0].equalsIgnoreCase("clientid"))
+ {
+ clientId = keyValuePair[1];
+ }
+ else if(keyValuePair[0].equalsIgnoreCase("ssl"))
+ {
+ ssl = Boolean.valueOf(keyValuePair[1]);
+ }
+ else if(keyValuePair[0].equalsIgnoreCase("remote-host"))
+ {
+ remoteHost = keyValuePair[1];
+ }
+ else if (keyValuePair[0].equalsIgnoreCase("binary-messageid"))
+ {
+ binaryMessageId = Boolean.parseBoolean(keyValuePair[1]);
+ }
+ }
}
if(remoteHost == null)
@@ -184,7 +193,11 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection
remoteHost = host;
}
- return new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl);
+ ConnectionFactoryImpl connectionFactory =
+ new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl);
+ connectionFactory.setUseBinaryMessageId(binaryMessageId);
+
+ return connectionFactory;
}
@@ -235,4 +248,9 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection
{
_queuePrefix = queuePrefix;
}
+
+ public void setUseBinaryMessageId(boolean useBinaryMessageId)
+ {
+ _useBinaryMessageId = useBinaryMessageId;
+ }
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
index 73701889b5..0ad62fd730 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
@@ -56,7 +56,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
private String _clientId;
private String _queuePrefix;
private String _topicPrefix;
-
+ private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true"));
private static enum State
{
@@ -163,6 +163,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
connect();
started = true;
}
+
try
{
SessionImpl session = new SessionImpl(this, acknowledgeMode);
@@ -170,6 +171,11 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
session.setTopicSession(_isTopicConnection);
_sessions.add(session);
+ if(_state == State.STARTED)
+ {
+ session.start();
+ }
+
return session;
}
catch(JMSException e)
@@ -191,9 +197,17 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
throw e;
}
}
-
}
+
+ }
+
+ void removeSession(SessionImpl session)
+ {
+ synchronized (_lock)
+ {
+ _sessions.remove(session);
+ }
}
private void reconnect(String networkHost, int port, String hostName)
@@ -410,10 +424,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
public boolean isStarted()
{
- synchronized (_lock)
- {
- return _state == State.STARTED;
- }
+ return _state == State.STARTED;
}
void setQueueConnection(final boolean queueConnection)
@@ -499,4 +510,15 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
return new DecodedDestination(address, kind);
}
+ void setUseBinaryMessageId(boolean useBinaryMessageId)
+ {
+ _useBinaryMessageId = useBinaryMessageId;
+ }
+
+ boolean useBinaryMessageId()
+ {
+ return _useBinaryMessageId;
+ }
+
+
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
index def5ae5931..d2b34e0f13 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.InvalidSelectorException;
@@ -117,6 +118,29 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
_session = session;
_receiver = createClientReceiver();
+ _receiver.setRemoteErrorListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ final ExceptionListener exceptionListener = _session.getConnection().getExceptionListener();
+
+ if(exceptionListener != null)
+ {
+ final Error receiverError = _receiver.getError();
+ exceptionListener.onException(new JMSException(receiverError.getDescription(),
+ receiverError.getCondition().getValue().toString()));
+
+ }
+ }
+ catch (JMSException e)
+ {
+
+ }
+ }
+ });
}
@@ -125,8 +149,8 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
{
try
{
- return _session.getClientSession(). createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO,
- _linkName, _durable, getFilters(), null);
+ return _session.getClientSession().createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO,
+ _linkName, _durable, getFilters(), null);
}
catch (ConnectionErrorException e)
{
@@ -188,15 +212,16 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
{
checkClosed();
_messageListener = messageListener;
- _session.messageListenerSet( this );
_receiver.setMessageArrivalListener(new Receiver.MessageArrivalListener()
- {
+ {
+
+ public void messageArrived(final Receiver receiver)
+ {
+ _session.messageArrived(MessageConsumerImpl.this);
+ }
+ });
+ _session.messageListenerSet( this );
- public void messageArrived(final Receiver receiver)
- {
- _session.messageArrived(MessageConsumerImpl.this);
- }
- });
}
public MessageImpl receive() throws JMSException
@@ -219,12 +244,14 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
return receiveImpl(0L);
}
- private MessageImpl receiveImpl(long timeout) throws IllegalStateException
+ private MessageImpl receiveImpl(long timeout) throws JMSException
{
+
org.apache.qpid.amqp_1_0.client.Message msg;
boolean redelivery;
if(_replaymessages.isEmpty())
{
+ checkReceiverError();
msg = receive0(timeout);
redelivery = false;
}
@@ -241,8 +268,21 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
return createJMSMessage(msg, redelivery);
}
+ void checkReceiverError() throws JMSException
+ {
+ final Error receiverError = _receiver.getError();
+ if(receiverError != null)
+ {
+ JMSException jmsException =
+ new JMSException(receiverError.getDescription(), receiverError.getCondition().toString());
+
+ throw jmsException;
+ }
+ }
+
Message receive0(final long timeout)
{
+
Message message = _receiver.receive(timeout);
if(_session.getAckModeEnum() == Session.AcknowledgeMode.CLIENT_ACKNOWLEDGE)
{
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
index 77544e4112..79c1606edb 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
@@ -19,17 +19,22 @@
package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
+import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.jms.MessageProducer;
+import org.apache.qpid.amqp_1_0.jms.MessageRejectedException;
import org.apache.qpid.amqp_1_0.jms.QueueSender;
import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import javax.jms.*;
import javax.jms.IllegalStateException;
import java.util.UUID;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.transport.*;
public class MessageProducerImpl implements MessageProducer, QueueSender, TopicPublisher
{
@@ -43,6 +48,8 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
private SessionImpl _session;
private Sender _sender;
private boolean _closed;
+ private boolean _syncPublish = Boolean.getBoolean("qpid.sync_publish");
+ private long _syncPublishTimeout = Long.getLong("qpid.sync_publish_timeout", 30000l);
protected MessageProducerImpl(final Destination destination,
final SessionImpl session) throws JMSException
@@ -81,6 +88,29 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
jmsEx.setLinkedException(e);
throw jmsEx;
}
+ _sender.setRemoteErrorListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ final ExceptionListener exceptionListener = _session.getConnection().getExceptionListener();
+
+ if(exceptionListener != null)
+ {
+ final org.apache.qpid.amqp_1_0.type.transport.Error receiverError = _sender.getError();
+ exceptionListener.onException(new JMSException(receiverError.getDescription(),
+ receiverError.getCondition().getValue().toString()));
+
+ }
+ }
+ catch (JMSException e)
+ {
+
+ }
+ }
+ });
}
}
@@ -234,7 +264,7 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
if(!getDisableMessageID() && msg.getMessageId() == null)
{
- final Binary messageId = generateMessageId();
+ final Object messageId = generateMessageId();
msg.setMessageId(messageId);
}
@@ -251,7 +281,28 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
final org.apache.qpid.amqp_1_0.client.Message clientMessage = new org.apache.qpid.amqp_1_0.client.Message(msg.getSections());
- _sender.send(clientMessage, _session.getTxn());
+ DispositionAction action = null;
+
+ if(_syncPublish)
+ {
+ action = new DispositionAction(_sender);
+ }
+
+ try
+ {
+ _sender.send(clientMessage, _session.getTxn(), action);
+ }
+ catch (LinkDetachedException e)
+ {
+ JMSException jmsException = new InvalidDestinationException("Sender has been closed");
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+
+ if(_syncPublish && !action.wasAccepted(_syncPublishTimeout + System.currentTimeMillis()))
+ {
+ throw new MessageRejectedException("Message was rejected");
+ }
if(getDestination() != null)
{
@@ -270,10 +321,11 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
send((Destination)queue, message, deliveryMode, priority, ttl);
}
- private Binary generateMessageId()
+ private Object generateMessageId()
{
UUID uuid = UUID.randomUUID();
- return new Binary(uuid.toString().getBytes());
+ final String messageIdString = uuid.toString();
+ return _session.getConnection().useBinaryMessageId() ? new Binary(messageIdString.getBytes()) : messageIdString;
}
public void send(final Destination destination, final Message message) throws JMSException
@@ -377,4 +429,61 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
{
send(topic, message, deliveryMode, priority, ttl);
}
+
+ private static class DispositionAction implements Sender.OutcomeAction
+ {
+ private final Sender _sender;
+ private final Object _lock;
+ private Outcome _outcome;
+
+ public DispositionAction(Sender sender)
+ {
+ _sender = sender;
+ _lock = sender.getEndpoint().getLock();
+ }
+
+ @Override
+ public void onOutcome(Binary deliveryTag, Outcome outcome)
+ {
+ synchronized (_lock)
+ {
+ _outcome = outcome;
+ _lock.notifyAll();
+ }
+ }
+
+ public boolean wasAccepted(long timeout) throws JMSException
+ {
+ synchronized(_lock)
+ {
+ while(_outcome == null && !_sender.getEndpoint().isDetached())
+ {
+ try
+ {
+ _lock.wait(timeout - System.currentTimeMillis());
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ if(_outcome == null)
+ {
+
+ if(_sender.getEndpoint().isDetached())
+ {
+ throw new JMSException("Link was detached");
+ }
+ else
+ {
+ throw new JMSException("Timed out waiting for message acceptance");
+ }
+ }
+ else
+ {
+ return _outcome instanceof Accepted;
+ }
+ }
+ }
+ }
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
index ba487cc3f6..2ae67913fe 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
@@ -21,10 +21,12 @@ package org.apache.qpid.amqp_1_0.jms.impl;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
@@ -52,9 +54,11 @@ import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
import org.apache.qpid.amqp_1_0.jms.TopicSession;
import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
+import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
public class SessionImpl implements Session, QueueSession, TopicSession
{
@@ -90,6 +94,45 @@ public class SessionImpl implements Session, QueueSession, TopicSession
jmsException.setLinkedException(e);
throw jmsException;
}
+ _session.getEndpoint().setSessionEventListener(new SessionEventListener.DefaultSessionEventListener()
+ {
+ @Override
+ public void remoteEnd(End end)
+ {
+ if(!_closed)
+ {
+ try
+ {
+ close();
+ }
+ catch (JMSException e)
+ {
+ }
+ try
+ {
+ final Error error = end.getError();
+ final ExceptionListener exceptionListener = _connection.getExceptionListener();
+ if(exceptionListener != null)
+ {
+ if(error != null)
+ {
+ exceptionListener.onException(new JMSException(error.getDescription(),
+ error.getCondition().getValue().toString()));
+ }
+ else
+ {
+ exceptionListener.onException(new JMSException("Session remotely closed"));
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+
+ }
+
+ }
+ }
+ });
if(_acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED)
{
_txn = _session.createSessionLocalTransaction();
@@ -230,6 +273,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession
producer.close();
}
_session.close();
+ _connection.removeSession(this);
}
}
@@ -765,7 +809,6 @@ public class SessionImpl implements Session, QueueSession, TopicSession
return _txn;
}
-
private class Dispatcher implements Runnable
{
@@ -816,7 +859,6 @@ public class SessionImpl implements Session, QueueSession, TopicSession
msg = consumer.receive0(0L);
}
-
MessageListener listener = consumer._messageListener;
MessageImpl message = consumer.createJMSMessage(msg, recoveredMessage);
@@ -847,7 +889,28 @@ public class SessionImpl implements Session, QueueSession, TopicSession
}
}
+ Iterator<MessageConsumerImpl> consumers = _consumers.iterator();
+ while(consumers.hasNext())
+ {
+ MessageConsumerImpl consumer = consumers.next();
+ try
+ {
+ consumer.checkReceiverError();
+ }
+ catch (JMSException e)
+ {
+ consumers.remove();
+ try
+ {
+ _connection.getExceptionListener().onException(e);
+ consumer.close();
+ }
+ catch (JMSException e1)
+ {
+ }
+ }
+ }
}
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java
index 76608c421b..2c48a6b20f 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java
@@ -34,6 +34,7 @@ public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue
private SessionImpl _session;
private final Set<MessageConsumer> _consumers =
Collections.synchronizedSet(new HashSet<MessageConsumer>());
+ private boolean _deleted;
protected TemporaryQueueImpl(String address, Sender sender, SessionImpl session)
{
@@ -56,7 +57,8 @@ public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue
{
if(_consumers.isEmpty())
{
- close();
+ close();
+ _deleted = true;
}
else
{
@@ -100,6 +102,6 @@ public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue
public boolean isDeleted()
{
- return _sender == null;
+ return _deleted;
}
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java
index 8e0d07e78b..3ac70a29f2 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java
@@ -34,6 +34,7 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic
private SessionImpl _session;
private final Set<MessageConsumer> _consumers =
Collections.synchronizedSet(new HashSet<MessageConsumer>());
+ private boolean _deleted;
protected TemporaryTopicImpl(String address, Sender sender, SessionImpl session)
{
@@ -57,6 +58,7 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic
{
if(_consumers.isEmpty())
{
+ _deleted = true;
close();
}
else
@@ -105,6 +107,6 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic
public boolean isDeleted()
{
- return _sender == null;
+ return _deleted;
}
}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
index c3193f9fea..5e77b7097c 100644
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
+++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
@@ -173,7 +173,14 @@ public class Demo extends Util
Section[] sections = { properties, appProperties, amqpValue};
final Message message1 = new Message(Arrays.asList(sections));
- s.send(message1);
+ try
+ {
+ s.send(message1);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
Map<Object, Sender> sendingLinks = new HashMap<Object, Sender>();
Map<Object, Receiver> receivingLinks = new HashMap<Object, Receiver>();
@@ -295,7 +302,14 @@ public class Demo extends Util
m2propmap.put(VENDOR, vendor);
ApplicationProperties m2appProps = new ApplicationProperties(m2propmap);
Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId)));
- sender.send(m2);
+ try
+ {
+ sender.send(m2);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
Map m3propmap = new HashMap();
m3propmap.put(OPCODE, LOG);
@@ -307,8 +321,14 @@ public class Demo extends Util
Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
new AmqpValue("AMQP-"+messageId)));
- s.send(m3);
-
+ try
+ {
+ s.send(m3);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
responseReceiver.acknowledge(m);
@@ -336,7 +356,14 @@ public class Demo extends Util
Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
new AmqpValue("AMQP-"+mp.getMessageId())));
- s.send(m3);
+ try
+ {
+ s.send(m3);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
entry.getValue().acknowledge(m);
}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
index 998d68cfa6..06440b8f19 100644
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
+++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
@@ -121,14 +121,7 @@ public class Dump extends Util
session.close();
conn.close();
- } catch (ConnectionException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- } catch (Sender.SenderClosingException e)
+ } catch (Exception e)
{
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
index 37550ea52f..c7bcd99312 100644
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
+++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
@@ -248,27 +248,10 @@ public class Filesender extends Util
session.close();
conn.close();
}
- catch (ConnectionException e)
+ catch (Exception e)
{
e.printStackTrace();
}
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace();
- } catch (FileNotFoundException e)
- {
- e.printStackTrace();
- } catch (IOException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- } catch (NoSuchAlgorithmException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- } catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
-
}
private Message createMessageFromFile(MessageDigest md5, String fileName, File file) throws IOException
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
index d8da58dc76..dbe273182f 100644
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
+++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
@@ -216,23 +216,10 @@ public class Request extends Util
conn2.close();
}
}
- catch (ConnectionException e)
+ catch (Exception e)
{
e.printStackTrace(); //TODO.
}
- catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (AmqpErrorException e)
- {
- e.printStackTrace(); //TODO.
- }
-
}
protected boolean hasSingleLinkPerConnectionMode()
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
index 6b1b70476e..1e4bcfc7d7 100644
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
+++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
@@ -274,21 +274,13 @@ public class Respond extends Util
_conn.close();
System.out.println("Received: " + receivedCount);
}
- catch (ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
+ catch (Exception e)
{
e.printStackTrace(); //TODO.
}
}
- private void respond(Message m) throws Sender.SenderCreationException, ConnectionClosedException
+ private void respond(Message m) throws Sender.SenderCreationException, ConnectionClosedException, LinkDetachedException
{
List<Section> sections = m.getPayload();
String replyTo = null;
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
index ef1a31005c..b4ae16ab3f 100644
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
+++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
@@ -219,19 +219,10 @@ public class Send extends Util
session.close();
conn.close();
}
- catch (Sender.SenderClosingException e)
+ catch (Exception e)
{
e.printStackTrace(); //TODO.
}
- catch (ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //TODO.
- }
-
}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java
new file mode 100644
index 0000000000..45b00255f2
--- /dev/null
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+public class LinkDetachedException extends Exception
+{
+ private final org.apache.qpid.amqp_1_0.type.transport.Error _remoteError;
+
+ public LinkDetachedException(Error remoteError)
+ {
+ super();
+ _remoteError = remoteError;
+ }
+
+ public org.apache.qpid.amqp_1_0.type.transport.Error getRemoteError()
+ {
+ return _remoteError;
+ }
+
+}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
index 6996171707..596931088f 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
@@ -50,6 +50,7 @@ public class Receiver implements DeliveryStateHandler
private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>();
private MessageArrivalListener _messageArrivalListener;
private org.apache.qpid.amqp_1_0.type.transport.Error _error;
+ private Runnable _remoteErrorTask;
public Receiver(final Session session,
final String linkName,
@@ -125,6 +126,10 @@ public class Receiver implements DeliveryStateHandler
public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
{
_error = detach.getError();
+ if(detach.getError()!=null)
+ {
+ remoteError();
+ }
super.remoteDetached(endpoint, detach);
}
});
@@ -171,6 +176,14 @@ public class Receiver implements DeliveryStateHandler
}
}
+ private void remoteError()
+ {
+ if(_remoteErrorTask != null)
+ {
+ _remoteErrorTask.run();
+ }
+ }
+
private void postPrefetchAction()
{
if(_messageArrivalListener != null)
@@ -566,6 +579,11 @@ public class Receiver implements DeliveryStateHandler
synchronized(_endpoint.getLock())
{
_messageArrivalListener = messageArrivalListener;
+ int prefetchSize = _prefetchQueue.size();
+ for(int i = 0; i < prefetchSize; i++)
+ {
+ postPrefetchAction();
+ }
}
}
@@ -590,4 +608,8 @@ public class Receiver implements DeliveryStateHandler
void messageArrived(Receiver receiver);
}
+ public void setRemoteErrorListener(Runnable listener)
+ {
+ _remoteErrorTask = listener;
+ }
}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
index cf9f44af75..0feaa48805 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
@@ -22,7 +22,9 @@ package org.apache.qpid.amqp_1_0.client;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
import org.apache.qpid.amqp_1_0.type.*;
import org.apache.qpid.amqp_1_0.type.Source;
import org.apache.qpid.amqp_1_0.type.Target;
@@ -35,6 +37,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
public class Sender implements DeliveryStateHandler
{
@@ -44,6 +47,8 @@ public class Sender implements DeliveryStateHandler
private int _windowSize;
private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
private boolean _closed;
+ private Error _error;
+ private Runnable _remoteErrorTask;
public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr)
throws SenderCreationException, ConnectionClosedException
@@ -166,6 +171,21 @@ public class Sender implements DeliveryStateHandler
throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress());
};
}
+
+ _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener()
+ {
+
+ @Override
+ public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+ {
+ _error = detach.getError();
+ if(_error != null)
+ {
+ remoteError();
+ }
+ super.remoteDetached(endpoint, detach);
+ }
+ });
}
public Source getSource()
@@ -178,22 +198,22 @@ public class Sender implements DeliveryStateHandler
return _endpoint.getTarget();
}
- public void send(Message message)
+ public void send(Message message) throws LinkDetachedException
{
send(message, null, null);
}
- public void send(Message message, final OutcomeAction action)
+ public void send(Message message, final OutcomeAction action) throws LinkDetachedException
{
send(message, null, action);
}
- public void send(Message message, final Transaction txn)
+ public void send(Message message, final Transaction txn) throws LinkDetachedException
{
send(message, txn, null);
}
- public void send(Message message, final Transaction txn, OutcomeAction action)
+ public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException
{
List<Section> sections = message.getPayload();
@@ -245,7 +265,7 @@ public class Sender implements DeliveryStateHandler
final Object lock = _endpoint.getLock();
synchronized(lock)
{
- while(!_endpoint.hasCreditToSend())
+ while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
{
try
{
@@ -256,6 +276,10 @@ public class Sender implements DeliveryStateHandler
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
+ if(_endpoint.isDetached())
+ {
+ throw new LinkDetachedException(_error);
+ }
if(action != null)
{
_outcomeActions.put(message.getDeliveryTag(), action);
@@ -352,6 +376,21 @@ public class Sender implements DeliveryStateHandler
_endpoint.updateDisposition(deliveryTag, state, true);
}
}
+ else if(state instanceof TransactionalState)
+ {
+ OutcomeAction action;
+
+ if((action = _outcomeActions.remove(deliveryTag)) != null)
+ {
+ action.onOutcome(deliveryTag, ((TransactionalState) state).getOutcome());
+ }
+
+ }
+ }
+
+ public SendingLinkEndpoint getEndpoint()
+ {
+ return _endpoint;
}
public Map<Binary, DeliveryState> getRemoteUnsettled()
@@ -364,6 +403,26 @@ public class Sender implements DeliveryStateHandler
return _session;
}
+
+ private void remoteError()
+ {
+ if(_remoteErrorTask != null)
+ {
+ _remoteErrorTask.run();
+ }
+ }
+
+
+ public void setRemoteErrorListener(Runnable listener)
+ {
+ _remoteErrorTask = listener;
+ }
+
+ public Error getError()
+ {
+ return _error;
+ }
+
public class SenderCreationException extends Exception
{
public SenderCreationException(Throwable e)
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java
index 73c4d4bd95..8e09e093e0 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java
@@ -31,7 +31,6 @@ public abstract class CompoundWriter<V> implements ValueWriter<V>
private Registry _registry;
private static final int LARGE_COMPOUND_THRESHOLD_COUNT = 25;
private ValueWriter _delegate;
- private Map<Class, ValueWriter> _writerCache = new HashMap<Class, ValueWriter>();
public CompoundWriter(final Registry registry)
{
@@ -305,7 +304,7 @@ public abstract class CompoundWriter<V> implements ValueWriter<V>
for(int i = 0; i < getCount(); i++)
{
Object val = next();
- ValueWriter writer = _registry.getValueWriter(val, _writerCache);
+ ValueWriter writer = _registry.getValueWriter(val);
if(writer == null)
{
// TODO
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedLongWriter.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedLongWriter.java
index 93e0fea740..6b77620e5e 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedLongWriter.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedLongWriter.java
@@ -127,7 +127,7 @@ public class UnsignedLongWriter implements ValueWriter<UnsignedLong>
{
_delegate = _zeroByteWriter;
}
- else if(ulong.compareTo(UnsignedLong.valueOf(256))<0)
+ else if((ulong.longValue() & 0xffL) == ulong.longValue())
{
_delegate = _oneByteWriter;
}
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java
index 4ee0fd1f70..82e6216c21 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java
@@ -298,102 +298,71 @@ public class AMQPDescribedTypeRegistry implements DescribedTypeConstructorRegist
private final Map<Class, ValueWriter.Factory> _writerMap = new HashMap<Class, ValueWriter.Factory>();
- private final Map<Class, ValueWriter> _cachedWriters = new HashMap<Class,ValueWriter>();
public <V extends Object> ValueWriter<V> getValueWriter(V value, Map<Class, ValueWriter> localCache)
{
- Class<? extends Object> clazz = value == null ? Void.TYPE : value.getClass();
- ValueWriter writer = null; // TODO localCache.get(clazz);
- if(writer == null || !writer.isComplete())
- {
- writer = getValueWriter(value);
- localCache.put(clazz, writer);
- }
- else
- {
- writer.setValue(value);
- }
+ return getValueWriter(value);
-
- return writer;
}
public <V extends Object> ValueWriter<V> getValueWriter(V value)
{
+ ValueWriter writer;
Class<? extends Object> clazz = value == null ? Void.TYPE : value.getClass();
- ValueWriter writer = null; // TODO _cachedWriters.get(clazz);
- if(writer == null || !writer.isComplete())
+ ValueWriter.Factory<V> factory = (ValueWriter.Factory<V>) (_writerMap.get(clazz));
+
+ if(factory == null)
{
- ValueWriter.Factory<V> factory = (ValueWriter.Factory<V>) (_writerMap.get(clazz));
+ if(value instanceof List)
+ {
+ factory = _writerMap.get(List.class);
+ _writerMap.put(value.getClass(), factory);
+ writer = factory.newInstance(this);
+ writer.setValue(value);
- if(factory == null)
+ }
+ else if(value instanceof Map)
{
- if(value instanceof List)
- {
- factory = _writerMap.get(List.class);
- _writerMap.put(value.getClass(), factory);
- writer = factory.newInstance(this);
- if(writer.isCacheable())
- {
- _cachedWriters.put(clazz, writer);
- }
- writer.setValue(value);
+ factory = _writerMap.get(Map.class);
+ _writerMap.put(value.getClass(), factory);
+ writer = factory.newInstance(this);
+ writer.setValue(value);
- }
- else if(value instanceof Map)
+ }
+ else if(value.getClass().isArray())
+ {
+ if(RestrictedType.class.isAssignableFrom(value.getClass().getComponentType()))
{
- factory = _writerMap.get(Map.class);
- _writerMap.put(value.getClass(), factory);
- writer = factory.newInstance(this);
- if(writer.isCacheable())
+ RestrictedType[] restrictedTypes = (RestrictedType[]) value;
+ Object[] newVals = (Object[]) Array.newInstance(restrictedTypes[0].getValue().getClass(),
+ restrictedTypes.length);
+ for(int i = 0; i < restrictedTypes.length; i++)
{
- _cachedWriters.put(clazz, writer);
+ newVals[i] = restrictedTypes[i].getValue();
}
- writer.setValue(value);
-
+ return (ValueWriter<V>) getValueWriter(newVals);
}
- else if(value.getClass().isArray())
- {
- if(RestrictedType.class.isAssignableFrom(value.getClass().getComponentType()))
- {
- RestrictedType[] restrictedTypes = (RestrictedType[]) value;
- Object[] newVals = (Object[]) Array.newInstance(restrictedTypes[0].getValue().getClass(),
- restrictedTypes.length);
- for(int i = 0; i < restrictedTypes.length; i++)
- {
- newVals[i] = restrictedTypes[i].getValue();
- }
- return (ValueWriter<V>) getValueWriter(newVals);
- }
- // TODO primitive array types
- factory = _writerMap.get(List.class);
- writer = factory.newInstance(this);
- writer.setValue(Arrays.asList((Object[])value));
+ // TODO primitive array types
+ factory = _writerMap.get(List.class);
+ writer = factory.newInstance(this);
+ writer.setValue(Arrays.asList((Object[])value));
- }
- else
- {
- return null;
- }
}
else
{
- writer = factory.newInstance(this);
- if(writer.isCacheable())
- {
- _cachedWriters.put(clazz, writer);
- }
- writer.setValue(value);
+ return null;
}
}
else
{
+ writer = factory.newInstance(this);
writer.setValue(value);
}
+
return writer;
}