summaryrefslogtreecommitdiff
path: root/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java')
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java75
1 files changed, 56 insertions, 19 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
index ea3ad7606e..5775eb4c2e 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
@@ -68,22 +68,22 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
private final Container _container;
private Principal _user;
- private static final short DEFAULT_CHANNEL_MAX = 255;
+ private static final short DEFAULT_CHANNEL_MAX = Integer.getInteger("amqp.channel_max", 255).shortValue();
private static final int DEFAULT_MAX_FRAME = Integer.getInteger("amqp.max_frame_size", 1 << 15);
private ConnectionState _state = ConnectionState.UNOPENED;
- private short _channelMax;
+ private short _channelMax = DEFAULT_CHANNEL_MAX;
private int _maxFrameSize = 4096;
private String _remoteContainerId;
private SocketAddress _remoteAddress;
// positioned by the *outgoing* channel
- private SessionEndpoint[] _sendingSessions = new SessionEndpoint[DEFAULT_CHANNEL_MAX + 1];
+ private SessionEndpoint[] _sendingSessions;
// positioned by the *incoming* channel
- private SessionEndpoint[] _receivingSessions = new SessionEndpoint[DEFAULT_CHANNEL_MAX + 1];
+ private SessionEndpoint[] _receivingSessions;
private boolean _closedForInput;
private boolean _closedForOutput;
@@ -165,7 +165,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
}
if (_state == ConnectionState.UNOPENED)
{
- sendOpen(DEFAULT_CHANNEL_MAX, DEFAULT_MAX_FRAME);
+ sendOpen(_channelMax, DEFAULT_MAX_FRAME);
_state = ConnectionState.AWAITING_OPEN;
}
}
@@ -183,10 +183,10 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
public synchronized SessionEndpoint createSession(String name)
{
// todo assert connection state
- SessionEndpoint endpoint = new SessionEndpoint(this);
short channel = getFirstFreeChannel();
if (channel != -1)
{
+ SessionEndpoint endpoint = new SessionEndpoint(this);
_sendingSessions[channel] = endpoint;
endpoint.setSendingChannel(channel);
Begin begin = new Begin();
@@ -196,13 +196,14 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
begin.setHandleMax(_handleMax);
send(channel, begin);
+ return endpoint;
}
else
{
- // todo error
+ // TODO - report error
+ return null;
}
- return endpoint;
}
@@ -235,7 +236,16 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
{
Open open = new Open();
- open.setChannelMax(UnsignedShort.valueOf(DEFAULT_CHANNEL_MAX));
+ if(_receivingSessions == null)
+ {
+ _receivingSessions = new SessionEndpoint[channelMax+1];
+ _sendingSessions = new SessionEndpoint[channelMax+1];
+ }
+ if(channelMax < _channelMax)
+ {
+ _channelMax = channelMax;
+ }
+ open.setChannelMax(UnsignedShort.valueOf(channelMax));
open.setContainerId(_container.getId());
open.setMaxFrameSize(getDesiredMaxFrameSize());
open.setHostname(getRemoteHostname());
@@ -268,7 +278,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
short getFirstFreeChannel()
{
- for (int i = 0; i < _sendingSessions.length; i++)
+ for (int i = 0; i <= _channelMax; i++)
{
if (_sendingSessions[i] == null)
{
@@ -288,10 +298,16 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
public synchronized void receiveOpen(short channel, Open open)
{
- _channelMax = open.getChannelMax() == null ? DEFAULT_CHANNEL_MAX
- : open.getChannelMax().shortValue() < DEFAULT_CHANNEL_MAX
- ? DEFAULT_CHANNEL_MAX
- : open.getChannelMax().shortValue();
+ _channelMax = open.getChannelMax() == null ? _channelMax
+ : open.getChannelMax().shortValue() < _channelMax
+ ? open.getChannelMax().shortValue()
+ : _channelMax;
+
+ if(_receivingSessions == null)
+ {
+ _receivingSessions = new SessionEndpoint[_channelMax+1];
+ _sendingSessions = new SessionEndpoint[_channelMax+1];
+ }
UnsignedInteger remoteDesiredMaxFrameSize =
open.getMaxFrameSize() == null ? UnsignedInteger.valueOf(DEFAULT_MAX_FRAME) : open.getMaxFrameSize();
@@ -380,13 +396,30 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
if (!_closedForInput)
{
_closedForInput = true;
- for (int i = 0; i < _receivingSessions.length; i++)
+ switch(_state)
+ {
+ case UNOPENED:
+ case AWAITING_OPEN:
+ case CLOSE_SENT:
+ _state = ConnectionState.CLOSED;
+ case OPEN:
+ _state = ConnectionState.CLOSE_RECEIVED;
+ case CLOSED:
+ // already sent our close - too late to do anything more
+ break;
+ default:
+ }
+
+ if(_receivingSessions != null)
{
- if (_receivingSessions[i] != null)
+ for (int i = 0; i < _receivingSessions.length; i++)
{
- _receivingSessions[i].end();
- _receivingSessions[i] = null;
+ if (_receivingSessions[i] != null)
+ {
+ _receivingSessions[i].end();
+ _receivingSessions[i] = null;
+ }
}
}
}
@@ -604,7 +637,6 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
}
}
-
public void invalidHeaderReceived()
{
// TODO
@@ -984,4 +1016,9 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
{
return _remoteError;
}
+
+ public void setChannelMax(final short channelMax)
+ {
+ _channelMax = channelMax;
+ }
}