diff options
Diffstat (limited to 'qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java')
-rw-r--r-- | qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java | 75 |
1 files changed, 56 insertions, 19 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index ea3ad7606e..5775eb4c2e 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -68,22 +68,22 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour private final Container _container; private Principal _user; - private static final short DEFAULT_CHANNEL_MAX = 255; + private static final short DEFAULT_CHANNEL_MAX = Integer.getInteger("amqp.channel_max", 255).shortValue(); private static final int DEFAULT_MAX_FRAME = Integer.getInteger("amqp.max_frame_size", 1 << 15); private ConnectionState _state = ConnectionState.UNOPENED; - private short _channelMax; + private short _channelMax = DEFAULT_CHANNEL_MAX; private int _maxFrameSize = 4096; private String _remoteContainerId; private SocketAddress _remoteAddress; // positioned by the *outgoing* channel - private SessionEndpoint[] _sendingSessions = new SessionEndpoint[DEFAULT_CHANNEL_MAX + 1]; + private SessionEndpoint[] _sendingSessions; // positioned by the *incoming* channel - private SessionEndpoint[] _receivingSessions = new SessionEndpoint[DEFAULT_CHANNEL_MAX + 1]; + private SessionEndpoint[] _receivingSessions; private boolean _closedForInput; private boolean _closedForOutput; @@ -165,7 +165,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour } if (_state == ConnectionState.UNOPENED) { - sendOpen(DEFAULT_CHANNEL_MAX, DEFAULT_MAX_FRAME); + sendOpen(_channelMax, DEFAULT_MAX_FRAME); _state = ConnectionState.AWAITING_OPEN; } } @@ -183,10 +183,10 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour public synchronized SessionEndpoint createSession(String name) { // todo assert connection state - SessionEndpoint endpoint = new SessionEndpoint(this); short channel = getFirstFreeChannel(); if (channel != -1) { + SessionEndpoint endpoint = new SessionEndpoint(this); _sendingSessions[channel] = endpoint; endpoint.setSendingChannel(channel); Begin begin = new Begin(); @@ -196,13 +196,14 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour begin.setHandleMax(_handleMax); send(channel, begin); + return endpoint; } else { - // todo error + // TODO - report error + return null; } - return endpoint; } @@ -235,7 +236,16 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour { Open open = new Open(); - open.setChannelMax(UnsignedShort.valueOf(DEFAULT_CHANNEL_MAX)); + if(_receivingSessions == null) + { + _receivingSessions = new SessionEndpoint[channelMax+1]; + _sendingSessions = new SessionEndpoint[channelMax+1]; + } + if(channelMax < _channelMax) + { + _channelMax = channelMax; + } + open.setChannelMax(UnsignedShort.valueOf(channelMax)); open.setContainerId(_container.getId()); open.setMaxFrameSize(getDesiredMaxFrameSize()); open.setHostname(getRemoteHostname()); @@ -268,7 +278,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour short getFirstFreeChannel() { - for (int i = 0; i < _sendingSessions.length; i++) + for (int i = 0; i <= _channelMax; i++) { if (_sendingSessions[i] == null) { @@ -288,10 +298,16 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour public synchronized void receiveOpen(short channel, Open open) { - _channelMax = open.getChannelMax() == null ? DEFAULT_CHANNEL_MAX - : open.getChannelMax().shortValue() < DEFAULT_CHANNEL_MAX - ? DEFAULT_CHANNEL_MAX - : open.getChannelMax().shortValue(); + _channelMax = open.getChannelMax() == null ? _channelMax + : open.getChannelMax().shortValue() < _channelMax + ? open.getChannelMax().shortValue() + : _channelMax; + + if(_receivingSessions == null) + { + _receivingSessions = new SessionEndpoint[_channelMax+1]; + _sendingSessions = new SessionEndpoint[_channelMax+1]; + } UnsignedInteger remoteDesiredMaxFrameSize = open.getMaxFrameSize() == null ? UnsignedInteger.valueOf(DEFAULT_MAX_FRAME) : open.getMaxFrameSize(); @@ -380,13 +396,30 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour if (!_closedForInput) { _closedForInput = true; - for (int i = 0; i < _receivingSessions.length; i++) + switch(_state) + { + case UNOPENED: + case AWAITING_OPEN: + case CLOSE_SENT: + _state = ConnectionState.CLOSED; + case OPEN: + _state = ConnectionState.CLOSE_RECEIVED; + case CLOSED: + // already sent our close - too late to do anything more + break; + default: + } + + if(_receivingSessions != null) { - if (_receivingSessions[i] != null) + for (int i = 0; i < _receivingSessions.length; i++) { - _receivingSessions[i].end(); - _receivingSessions[i] = null; + if (_receivingSessions[i] != null) + { + _receivingSessions[i].end(); + _receivingSessions[i] = null; + } } } } @@ -604,7 +637,6 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour } } - public void invalidHeaderReceived() { // TODO @@ -984,4 +1016,9 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour { return _remoteError; } + + public void setChannelMax(final short channelMax) + { + _channelMax = channelMax; + } } |