summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-06-19 10:17:51 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-06-19 10:17:51 +0000
commit118c5c885463451ba8032fa109225c18a08efa45 (patch)
tree44c82228cb94ab4fa3b130ee0ac991aa8179951a
parent9344716446e05be74014a1e7f049ddf13edde03f (diff)
downloadqpid-python-118c5c885463451ba8032fa109225c18a08efa45.tar.gz
Merge from trunk up to r1494530
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-vhost-refactor@1494534 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/DeliveryAnnotationsWriter.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java35
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java27
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Struct.java17
5 files changed, 63 insertions, 32 deletions
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/DeliveryAnnotationsWriter.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/DeliveryAnnotationsWriter.java
index e22c26b290..f3404d78aa 100644
--- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/DeliveryAnnotationsWriter.java
+++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/DeliveryAnnotationsWriter.java
@@ -60,7 +60,7 @@ public class DeliveryAnnotationsWriter extends AbstractDescribedTypeWriter<Deliv
@Override
protected ValueWriter createDescribedWriter()
{
- return getRegistry().getValueWriter(_value);
+ return getRegistry().getValueWriter(_value.getValue());
}
private static Factory<DeliveryAnnotations> FACTORY = new Factory<DeliveryAnnotations>()
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 1baaff738b..6b87316e87 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -24,7 +24,6 @@ import static org.apache.qpid.transport.Option.UNRELIABLE;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
@@ -95,6 +94,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class);
private static Timer timer = new Timer("ack-flusher", true);
+ private final String _name;
private static class Flusher extends TimerTask
{
@@ -153,6 +153,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
private boolean _isHardError = Boolean.getBoolean("qpid.session.legacy_exception_behaviour");
//--- constructors
+
/**
* Creates a new session on a connection.
*
@@ -173,28 +174,38 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
defaultPrefetchLowMark);
_qpidConnection = qpidConnection;
- if (name == null)
+ _name = name;
+ _qpidSession = createSession();
+
+ if (maxAckDelay > 0)
+ {
+ flushTask = new Flusher(this);
+ timer.schedule(flushTask, new Date(), maxAckDelay);
+ }
+ }
+
+ protected Session createSession()
+ {
+ Session qpidSession;
+ if (_name == null)
{
- _qpidSession = _qpidConnection.createSession(1);
+ qpidSession = _qpidConnection.createSession(1);
}
else
{
- _qpidSession = _qpidConnection.createSession(name,1);
+ qpidSession = _qpidConnection.createSession(_name,1);
}
- _qpidSession.setSessionListener(this);
if (isTransacted())
{
- _qpidSession.txSelect();
- _qpidSession.setTransacted(true);
+ qpidSession.txSelect();
+ qpidSession.setTransacted(true);
}
+ qpidSession.setSessionListener(this);
- if (maxAckDelay > 0)
- {
- flushTask = new Flusher(this);
- timer.schedule(flushTask, new Date(), maxAckDelay);
- }
+ return qpidSession;
}
+
/**
* Creates a new session on a connection with the default 0-10 message factory.
*
diff --git a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
index 6341510c2f..6c745feea8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
@@ -88,7 +88,7 @@ public class XAResourceImpl implements AMQXAResource
{
if (_logger.isDebugEnabled())
{
- _logger.debug("commit tx branch with xid: ", xid);
+ _logger.debug("commit tx branch with xid: {} ", xid);
}
Future<XaResult> future =
_xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NONE);
@@ -132,7 +132,7 @@ public class XAResourceImpl implements AMQXAResource
{
if (_logger.isDebugEnabled())
{
- _logger.debug("end tx branch with xid: ", xid);
+ _logger.debug("end tx branch with xid: {}", xid);
}
switch (flag)
{
@@ -191,7 +191,7 @@ public class XAResourceImpl implements AMQXAResource
{
if (_logger.isDebugEnabled())
{
- _logger.debug("forget tx branch with xid: ", xid);
+ _logger.debug("forget tx branch with xid: {}", xid);
}
_xaSession.getQpidSession().dtxForget(convertXid(xid));
try
@@ -281,7 +281,7 @@ public class XAResourceImpl implements AMQXAResource
{
if (_logger.isDebugEnabled())
{
- _logger.debug("prepare ", xid);
+ _logger.debug("prepare {}", xid);
}
Future<XaResult> future = _xaSession.getQpidSession().dtxPrepare(convertXid(xid));
XaResult result = null;
@@ -361,7 +361,7 @@ public class XAResourceImpl implements AMQXAResource
{
if (_logger.isDebugEnabled())
{
- _logger.debug("rollback tx branch with xid: ", xid);
+ _logger.debug("rollback tx branch with xid: {}", xid);
}
Future<XaResult> future = _xaSession.getQpidSession().dtxRollback(convertXid(xid));
@@ -428,7 +428,7 @@ public class XAResourceImpl implements AMQXAResource
{
if (_logger.isDebugEnabled())
{
- _logger.debug("start tx branch with xid: ", xid);
+ _logger.debug("start tx branch with xid: {}", xid);
}
switch (flag)
{
@@ -524,7 +524,7 @@ public class XAResourceImpl implements AMQXAResource
// this should not happen
if (_logger.isDebugEnabled())
{
- _logger.debug("got unexpected status value: ", status);
+ _logger.debug("got unexpected status value: {}", status);
}
//A resource manager error has occured in the transaction branch.
throw new XAException(XAException.XAER_RMERR);
diff --git a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
index e01ec8578d..fa0bdcb4c9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
@@ -75,8 +75,15 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
boolean transacted, int ackMode, MessageFactoryRegistry registry, int defaultPrefetchHigh, int defaultPrefetchLow,
String name)
{
- super(qpidConnection, con, channelId, transacted, ackMode, registry, defaultPrefetchHigh, defaultPrefetchLow, name);
- createSession();
+ super(qpidConnection,
+ con,
+ channelId,
+ transacted,
+ ackMode,
+ registry,
+ defaultPrefetchHigh,
+ defaultPrefetchLow,
+ name);
_xaResource = new XAResourceImpl(this);
}
@@ -86,11 +93,13 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
/**
* Create a qpid session.
*/
- public void createSession()
+ @Override
+ public org.apache.qpid.transport.Session createSession()
{
_qpidDtxSession = getQpidConnection().createSession(0,true);
- _qpidDtxSession.setSessionListener(this);
_qpidDtxSession.dtxSelect();
+ _qpidDtxSession.setSessionListener(this);
+ return _qpidDtxSession;
}
/**
@@ -101,11 +110,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
*/
public Session getSession() throws JMSException
{
- if (_jmsSession == null)
- {
- _jmsSession = getAMQConnection().createSession(true, getAcknowledgeMode());
- }
- return _jmsSession;
+ return this;
}
/**
@@ -162,7 +167,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
*/
public QueueSession getQueueSession() throws JMSException
{
- return (QueueSession) getSession();
+ return this;
}
// interface XATopicSession
@@ -175,7 +180,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
*/
public TopicSession getTopicSession() throws JMSException
{
- return (TopicSession) getSession();
+ return this;
}
@Override
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Struct.java b/java/common/src/main/java/org/apache/qpid/transport/Struct.java
index 9b703a3117..045939e415 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Struct.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Struct.java
@@ -23,7 +23,9 @@ package org.apache.qpid.transport;
import org.apache.qpid.transport.codec.Decoder;
import org.apache.qpid.transport.codec.Encodable;
import org.apache.qpid.transport.codec.Encoder;
+import org.apache.qpid.transport.util.Functions;
+import java.util.Arrays;
import java.util.Map;
@@ -131,11 +133,24 @@ public abstract class Struct implements Encodable
}
str.append(me.getKey());
str.append("=");
- str.append(me.getValue());
+ str.append(formatValue(me.getValue()));
}
str.append(")");
return str.toString();
}
+ private Object formatValue(Object value)
+ {
+ if(value instanceof byte[])
+ {
+ return Functions.str((byte[])value);
+ }
+ else if(value instanceof Object[])
+ {
+ return Arrays.asList((Object[])value);
+ }
+ return value;
+ }
+
}