diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2013-06-19 10:17:51 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2013-06-19 10:17:51 +0000 |
commit | 118c5c885463451ba8032fa109225c18a08efa45 (patch) | |
tree | 44c82228cb94ab4fa3b130ee0ac991aa8179951a | |
parent | 9344716446e05be74014a1e7f049ddf13edde03f (diff) | |
download | qpid-python-118c5c885463451ba8032fa109225c18a08efa45.tar.gz |
Merge from trunk up to r1494530
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-vhost-refactor@1494534 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 63 insertions, 32 deletions
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/DeliveryAnnotationsWriter.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/DeliveryAnnotationsWriter.java index e22c26b290..f3404d78aa 100644 --- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/DeliveryAnnotationsWriter.java +++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/DeliveryAnnotationsWriter.java @@ -60,7 +60,7 @@ public class DeliveryAnnotationsWriter extends AbstractDescribedTypeWriter<Deliv @Override protected ValueWriter createDescribedWriter() { - return getRegistry().getValueWriter(_value); + return getRegistry().getValueWriter(_value.getValue()); } private static Factory<DeliveryAnnotations> FACTORY = new Factory<DeliveryAnnotations>() diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 1baaff738b..6b87316e87 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -24,7 +24,6 @@ import static org.apache.qpid.transport.Option.UNRELIABLE; import java.lang.ref.WeakReference; import java.util.ArrayList; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Iterator; @@ -95,6 +94,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class); private static Timer timer = new Timer("ack-flusher", true); + private final String _name; private static class Flusher extends TimerTask { @@ -153,6 +153,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private boolean _isHardError = Boolean.getBoolean("qpid.session.legacy_exception_behaviour"); //--- constructors + /** * Creates a new session on a connection. * @@ -173,28 +174,38 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark, defaultPrefetchLowMark); _qpidConnection = qpidConnection; - if (name == null) + _name = name; + _qpidSession = createSession(); + + if (maxAckDelay > 0) + { + flushTask = new Flusher(this); + timer.schedule(flushTask, new Date(), maxAckDelay); + } + } + + protected Session createSession() + { + Session qpidSession; + if (_name == null) { - _qpidSession = _qpidConnection.createSession(1); + qpidSession = _qpidConnection.createSession(1); } else { - _qpidSession = _qpidConnection.createSession(name,1); + qpidSession = _qpidConnection.createSession(_name,1); } - _qpidSession.setSessionListener(this); if (isTransacted()) { - _qpidSession.txSelect(); - _qpidSession.setTransacted(true); + qpidSession.txSelect(); + qpidSession.setTransacted(true); } + qpidSession.setSessionListener(this); - if (maxAckDelay > 0) - { - flushTask = new Flusher(this); - timer.schedule(flushTask, new Date(), maxAckDelay); - } + return qpidSession; } + /** * Creates a new session on a connection with the default 0-10 message factory. * diff --git a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java index 6341510c2f..6c745feea8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java @@ -88,7 +88,7 @@ public class XAResourceImpl implements AMQXAResource { if (_logger.isDebugEnabled()) { - _logger.debug("commit tx branch with xid: ", xid); + _logger.debug("commit tx branch with xid: {} ", xid); } Future<XaResult> future = _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NONE); @@ -132,7 +132,7 @@ public class XAResourceImpl implements AMQXAResource { if (_logger.isDebugEnabled()) { - _logger.debug("end tx branch with xid: ", xid); + _logger.debug("end tx branch with xid: {}", xid); } switch (flag) { @@ -191,7 +191,7 @@ public class XAResourceImpl implements AMQXAResource { if (_logger.isDebugEnabled()) { - _logger.debug("forget tx branch with xid: ", xid); + _logger.debug("forget tx branch with xid: {}", xid); } _xaSession.getQpidSession().dtxForget(convertXid(xid)); try @@ -281,7 +281,7 @@ public class XAResourceImpl implements AMQXAResource { if (_logger.isDebugEnabled()) { - _logger.debug("prepare ", xid); + _logger.debug("prepare {}", xid); } Future<XaResult> future = _xaSession.getQpidSession().dtxPrepare(convertXid(xid)); XaResult result = null; @@ -361,7 +361,7 @@ public class XAResourceImpl implements AMQXAResource { if (_logger.isDebugEnabled()) { - _logger.debug("rollback tx branch with xid: ", xid); + _logger.debug("rollback tx branch with xid: {}", xid); } Future<XaResult> future = _xaSession.getQpidSession().dtxRollback(convertXid(xid)); @@ -428,7 +428,7 @@ public class XAResourceImpl implements AMQXAResource { if (_logger.isDebugEnabled()) { - _logger.debug("start tx branch with xid: ", xid); + _logger.debug("start tx branch with xid: {}", xid); } switch (flag) { @@ -524,7 +524,7 @@ public class XAResourceImpl implements AMQXAResource // this should not happen if (_logger.isDebugEnabled()) { - _logger.debug("got unexpected status value: ", status); + _logger.debug("got unexpected status value: {}", status); } //A resource manager error has occured in the transaction branch. throw new XAException(XAException.XAER_RMERR); diff --git a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java index e01ec8578d..fa0bdcb4c9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java @@ -75,8 +75,15 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic boolean transacted, int ackMode, MessageFactoryRegistry registry, int defaultPrefetchHigh, int defaultPrefetchLow, String name) { - super(qpidConnection, con, channelId, transacted, ackMode, registry, defaultPrefetchHigh, defaultPrefetchLow, name); - createSession(); + super(qpidConnection, + con, + channelId, + transacted, + ackMode, + registry, + defaultPrefetchHigh, + defaultPrefetchLow, + name); _xaResource = new XAResourceImpl(this); } @@ -86,11 +93,13 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic /** * Create a qpid session. */ - public void createSession() + @Override + public org.apache.qpid.transport.Session createSession() { _qpidDtxSession = getQpidConnection().createSession(0,true); - _qpidDtxSession.setSessionListener(this); _qpidDtxSession.dtxSelect(); + _qpidDtxSession.setSessionListener(this); + return _qpidDtxSession; } /** @@ -101,11 +110,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic */ public Session getSession() throws JMSException { - if (_jmsSession == null) - { - _jmsSession = getAMQConnection().createSession(true, getAcknowledgeMode()); - } - return _jmsSession; + return this; } /** @@ -162,7 +167,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic */ public QueueSession getQueueSession() throws JMSException { - return (QueueSession) getSession(); + return this; } // interface XATopicSession @@ -175,7 +180,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic */ public TopicSession getTopicSession() throws JMSException { - return (TopicSession) getSession(); + return this; } @Override diff --git a/java/common/src/main/java/org/apache/qpid/transport/Struct.java b/java/common/src/main/java/org/apache/qpid/transport/Struct.java index 9b703a3117..045939e415 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Struct.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Struct.java @@ -23,7 +23,9 @@ package org.apache.qpid.transport; import org.apache.qpid.transport.codec.Decoder; import org.apache.qpid.transport.codec.Encodable; import org.apache.qpid.transport.codec.Encoder; +import org.apache.qpid.transport.util.Functions; +import java.util.Arrays; import java.util.Map; @@ -131,11 +133,24 @@ public abstract class Struct implements Encodable } str.append(me.getKey()); str.append("="); - str.append(me.getValue()); + str.append(formatValue(me.getValue())); } str.append(")"); return str.toString(); } + private Object formatValue(Object value) + { + if(value instanceof byte[]) + { + return Functions.str((byte[])value); + } + else if(value instanceof Object[]) + { + return Arrays.asList((Object[])value); + } + return value; + } + } |