summaryrefslogtreecommitdiff
path: root/java/amqp-1-0-common/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'java/amqp-1-0-common/src/main/java/org')
-rw-r--r--java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java27
-rw-r--r--java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java8
-rw-r--r--java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java23
-rw-r--r--java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java13
-rw-r--r--java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java4
-rw-r--r--java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java24
6 files changed, 82 insertions, 17 deletions
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java
index dbf9306366..95e327852b 100644
--- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java
+++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java
@@ -77,13 +77,21 @@ public class FrameWriter implements ValueWriter<AMQFrame>
{
case SIZE_0:
- _typeWriter.setValue(_frame.getFrameBody());
-
int payloadLength = _payload == null ? 0 : _payload.remaining();
- _size = _typeWriter.writeToBuffer(remaining > 8
- ? (ByteBuffer)buffer.duplicate().position(buffer.position()+8)
- : ByteBuffer.wrap(EMPTY_BYTE_ARRAY)) + 8 + payloadLength;
+ if(_typeWriter!=null)
+ {
+ _typeWriter.setValue(_frame.getFrameBody());
+
+
+ _size = _typeWriter.writeToBuffer(remaining > 8
+ ? (ByteBuffer)buffer.duplicate().position(buffer.position()+8)
+ : ByteBuffer.wrap(EMPTY_BYTE_ARRAY)) + 8 + payloadLength;
+ }
+ else
+ {
+ _size = 8 + payloadLength;
+ }
if(remaining >= 4)
{
buffer.putInt(_size);
@@ -239,7 +247,14 @@ public class FrameWriter implements ValueWriter<AMQFrame>
_size = -1;
_payload = null;
final Object frameBody = frame.getFrameBody();
- _typeWriter = _registry.getValueWriter(frameBody);
+ if(frameBody!=null)
+ {
+ _typeWriter = _registry.getValueWriter(frameBody);
+ }
+ else
+ {
+ _typeWriter = null;
+ }
_payload = frame.getPayload() == null ? null : frame.getPayload().duplicate();
}
}
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java
index 769fe13d29..9684e290f4 100644
--- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java
+++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java
@@ -21,6 +21,7 @@
package org.apache.qpid.amqp_1_0.framing;
+import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import java.nio.ByteBuffer;
@@ -65,4 +66,11 @@ public abstract class AMQFrame<T>
return _frameBody;
}
+ @Override
+ public String toString()
+ {
+ return "AMQFrame{" +
+ "frameBody=" + _frameBody +
+ '}';
+ }
}
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
index 78bed8a71e..f4cd06f3ef 100644
--- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
+++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
@@ -103,6 +103,7 @@ public class ConnectionHandler
private boolean _setForClose;
private boolean _closed;
+ private long _nextHeartbeat;
public FrameOutput(final ConnectionEndpoint conn)
{
@@ -165,14 +166,34 @@ public class ConnectionHandler
{
synchronized(_conn.getLock())
{
+ long time = System.currentTimeMillis();
try
{
AMQFrame frame = null;
while(!closed() && (frame = _queue.poll()) == null && wait)
{
- _conn.getLock().wait();
+ _conn.getLock().wait(_conn.getIdleTimeout()/2);
+
+ if(_conn.getIdleTimeout()>0)
+ {
+ time = System.currentTimeMillis();
+
+ if(frame == null && time > _nextHeartbeat)
+ {
+ frame = new TransportFrame((short) 0,null);
+ break;
+ }
+ }
}
+
+
+
+ if(frame != null)
+ {
+ _nextHeartbeat = time + _conn.getIdleTimeout()/2;
+
+ }
if(frame == _endOfFrameMarker)
{
_closed = true;
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
index 70e990d92e..17bc2caf5f 100644
--- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
+++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
@@ -81,6 +81,8 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
private boolean _closedForInput;
private boolean _closedForOutput;
+ private long _idleTimeout;
+
private AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance()
.registerTransportLayer()
.registerMessagingLayer()
@@ -282,6 +284,11 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
_remoteContainerId = open.getContainerId();
+ if(open.getIdleTimeOut() != null)
+ {
+ _idleTimeout = open.getIdleTimeOut().longValue();
+ }
+
switch(_state)
{
case UNOPENED:
@@ -316,6 +323,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
sendClose(new Close());
break;
case CLOSE_SENT:
+
default:
}
}
@@ -650,6 +658,11 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
return this;
}
+ public synchronized long getIdleTimeout()
+ {
+ return _idleTimeout;
+ }
+
public synchronized void close()
{
switch(_state)
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java
index 4135199045..aca781afb9 100644
--- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java
+++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java
@@ -71,6 +71,10 @@ public class Delivery
{
setComplete(true);
}
+ if(Boolean.TRUE.equals(transfer.getSettled()))
+ {
+ setSettled(true);
+ }
}
public List<Transfer> getTransfers()
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
index cf86fc2471..5fbca0b695 100644
--- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
+++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
@@ -113,33 +113,37 @@ public class ReceivingLinkEndpoint extends LinkEndpoint<ReceivingLinkListener>
synchronized (getLock())
{
TransientState transientState;
- boolean existingState = _unsettledMap.containsKey(transfer.getDeliveryTag());
- _unsettledMap.put(transfer.getDeliveryTag(), transfer.getState());
+ final Binary deliveryTag = delivery.getDeliveryTag();
+ boolean existingState = _unsettledMap.containsKey(deliveryTag);
+ if(!existingState || transfer.getState() != null)
+ {
+ _unsettledMap.put(deliveryTag, transfer.getState());
+ }
if(!existingState)
{
transientState = new TransientState(transfer.getDeliveryId());
- if(Boolean.TRUE.equals(transfer.getSettled()))
+ if(delivery.isSettled())
{
transientState.setSettled(true);
}
- _unsettledIds.put(transfer.getDeliveryTag(), transientState);
+ _unsettledIds.put(deliveryTag, transientState);
setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
setDeliveryCount(getDeliveryCount().add(UnsignedInteger.ONE));
}
else
{
- transientState = _unsettledIds.get(transfer.getDeliveryTag());
+ transientState = _unsettledIds.get(deliveryTag);
transientState.incrementCredit();
- if(Boolean.TRUE.equals(transfer.getSettled()))
+ if(delivery.isSettled())
{
transientState.setSettled(true);
}
}
- if(transientState.isSettled())
+ if(transientState.isSettled() && delivery.isComplete())
{
- _unsettledMap.remove(transfer.getDeliveryTag());
+ _unsettledMap.remove(deliveryTag);
}
getLinkEventListener().messageTransfer(transfer);
@@ -155,7 +159,7 @@ public class ReceivingLinkEndpoint extends LinkEndpoint<ReceivingLinkListener>
super.receiveFlow(flow);
_remoteDrain = Boolean.TRUE.equals((Boolean)flow.getDrain());
setAvailable(flow.getAvailable());
- _remoteTransferCount = flow.getDeliveryCount();
+ setDeliveryCount(flow.getDeliveryCount());
getLock().notifyAll();
}
}
@@ -371,7 +375,7 @@ public class ReceivingLinkEndpoint extends LinkEndpoint<ReceivingLinkListener>
tag = iter.next();
tagsToUpdate.add(tag);
- deliveryId = _unsettledIds.get(firstTag).getDeliveryId();
+ deliveryId = _unsettledIds.get(tag).getDeliveryId();
if(deliveryId.equals(last.add(UnsignedInteger.ONE)))
{