diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2011-09-10 14:45:22 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2011-09-10 14:45:22 +0000 |
commit | 9c36740bc0cbede0ce533f137fa048363c4461da (patch) | |
tree | f83f6477344458fb72c30f2891eeb769d35b1566 | |
parent | 5cf96f160dd6c390ed4f40927766d61d7120f6c0 (diff) | |
download | qpid-python-9c36740bc0cbede0ce533f137fa048363c4461da.tar.gz |
NO-JIRA : Fix 1-0 sandox breakage caused by merge
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1167527 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 92 insertions, 457 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPSASLEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPSASLEngine_1_0_0.java deleted file mode 100644 index ebd93b1185..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPSASLEngine_1_0_0.java +++ /dev/null @@ -1,339 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import org.apache.qpid.amqp_1_0.codec.FrameWriter; -import org.apache.qpid.amqp_1_0.framing.AMQFrame; -import org.apache.qpid.amqp_1_0.framing.FrameHandler; -import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; -import org.apache.qpid.amqp_1_0.transport.Container; -import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler; -import org.apache.qpid.amqp_1_0.type.FrameBody; - -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.server.configuration.ConfigStore; -import org.apache.qpid.server.configuration.ConnectionConfigType; -import org.apache.qpid.server.protocol.v1_0.Connection_1_0; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.transport.network.NetworkConnection; - -import java.io.PrintWriter; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.security.Principal; -import java.util.UUID; - -public class AMQPSASLEngine_1_0_0 implements ProtocolEngine, FrameOutputHandler -{ - public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; - - private NetworkConnection _networkDriver; - private long _readBytes; - private long _writtenBytes; - private final UUID _id; - private final IApplicationRegistry _appRegistry; - private long _createTime = System.currentTimeMillis(); - - private static final int BUF_SIZE = 8; - private static final ByteBuffer HEADER = - ByteBuffer.wrap(new byte[] - { - (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte) 0, - (byte) 1, - (byte) 0, - (byte) 0 - }); - - private FrameWriter _frameWriter; - private FrameHandler _frameHandler; - private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024); - private Object _sendLock = new Object(); - private byte _major; - private byte _minor; - private byte _revision; - private ConnectionEndpoint _conn; - - - static enum State { - A, - M, - Q, - P, - PROTOCOL, - MAJOR, - MINOR, - REVISION, - FRAME - } - - private State _state = State.A; - - public AMQPSASLEngine_1_0_0(NetworkConnection networkDriver, - final IApplicationRegistry appRegistry) - { - - - _id = appRegistry.getConfigStore().createId(); - _appRegistry = appRegistry; - setNetworkDriver(networkDriver); - - - // FIXME Two log messages to maintain compatinbility with earlier protocol versions -// _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false)); -// _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true)); - } - - public void setNetworkDriver(NetworkConnection driver) - { - _networkDriver = driver; - - - _conn = new ConnectionEndpoint(new Container(),_appRegistry.getAuthenticationManager()); - _conn.setConnectionEventListener(new Connection_1_0(_appRegistry)); - _conn.setFrameOutputHandler(this); - - - _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry()); - _frameHandler = new FrameHandler(_conn); - - _networkDriver.getSender().send(HEADER.duplicate()); - - - - } - - public SocketAddress getRemoteAddress() - { - return _networkDriver.getRemoteAddress(); - } - - public SocketAddress getLocalAddress() - { - return _networkDriver.getLocalAddress(); - } - - public long getReadBytes() - { - return _readBytes; - } - - public long getWrittenBytes() - { - return _writtenBytes; - } - - public void writerIdle() - { - //Todo - } - - public void readerIdle() - { - //Todo - } - - public String getAddress() - { - return getRemoteAddress().toString(); - } - - - public ConfigStore getConfigStore() - { - return _appRegistry.getConfigStore(); - } - - public UUID getId() - { - return _id; - } - - public ConnectionConfigType getConfigType() - { - return ConnectionConfigType.getInstance(); - } - - public boolean isDurable() - { - return false; - } - - public synchronized void received(ByteBuffer msg) - { - _readBytes += msg.remaining(); - switch(_state) - { - case A: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - break; - } - case M: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.M; - break; - } - - case Q: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.Q; - break; - } - case P: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.P; - break; - } - case PROTOCOL: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.PROTOCOL; - break; - } - case MAJOR: - if(msg.hasRemaining()) - { - _major = msg.get(); - } - else - { - _state = State.MAJOR; - break; - } - case MINOR: - if(msg.hasRemaining()) - { - _minor = msg.get(); - } - else - { - _state = State.MINOR; - break; - } - case REVISION: - if(msg.hasRemaining()) - { - _revision = msg.get(); - - _state = State.FRAME; - } - else - { - _state = State.REVISION; - break; - } - case FRAME: - _frameHandler.parse(msg); - } - - } - - public void exception(Throwable t) - { - t.printStackTrace(); - } - - public void closed() - { - // todo - - } - - public long getCreateTime() - { - return _createTime; - } - - - public boolean canSend() - { - return true; - } - - public void send(AMQFrame frame) - { - send(frame, null); - } - public void send(AMQFrame frame, ByteBuffer buffer) - { - - synchronized(_sendLock) - { - - if(_buf.remaining() < MAX_FRAME_SIZE) - { - _buf = ByteBuffer.allocate(1024*1024); - } - - _frameWriter.setValue(frame); - - ByteBuffer dup = _buf.slice(); - - _frameWriter.writeToBuffer(dup); - - _buf.position(_buf.position()+dup.position()); - - dup.flip(); - _writtenBytes += dup.limit(); - _networkDriver.getSender().send(dup); - - } - } - - public void close() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void setLogOutput(final PrintWriter out) - { - //TODO - } - - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 3852253058..a71d396919 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -311,7 +311,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public ServerProtocolEngine getProtocolEngine() { - return new ProtocolEngine_1_0_0(_network, _appRegistry); + return new ProtocolEngine_1_0_0(_appRegistry); } }; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java index 18b1b71d69..6c07e4912f 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java @@ -32,6 +32,7 @@ import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.configuration.*; import org.apache.qpid.server.protocol.v1_0.Connection_1_0; import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; import java.io.PrintWriter; @@ -46,9 +47,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa { static final AtomicLong _connectionIdSource = new AtomicLong(0L); - public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; - - private NetworkConnection _networkDriver; + //private NetworkConnection _networkDriver; private long _readBytes; private long _writtenBytes; private final UUID _id; @@ -57,7 +56,6 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa private ConnectionEndpoint _conn; private final long _connectionId = _connectionIdSource.getAndIncrement(); - private static final int BUF_SIZE = 8; private static final ByteBuffer HEADER = ByteBuffer.wrap(new byte[] { @@ -73,11 +71,12 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa private FrameWriter _frameWriter; private FrameHandler _frameHandler; - private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024); private Object _sendLock = new Object(); private byte _major; private byte _minor; private byte _revision; + private NetworkConnection _network; + private Sender<ByteBuffer> _sender; static enum State { @@ -94,56 +93,21 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa private State _state = State.A; - public ProtocolEngine_1_0_0(NetworkConnection networkDriver, - final IApplicationRegistry appRegistry) + public ProtocolEngine_1_0_0(final IApplicationRegistry appRegistry) { - - _id = appRegistry.getConfigStore().createId(); _appRegistry = appRegistry; - setNetworkDriver(networkDriver); - - - // FIXME Two log messages to maintain compatinbility with earlier protocol versions -// _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false)); -// _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true)); } - public void setNetworkDriver(NetworkConnection driver) - { - _networkDriver = driver; - Container container = new Container(); - - Principal principal = new Principal() - { - - public String getName() - { - // TODO - return "rob"; - } - }; - _conn = new ConnectionEndpoint(container,_appRegistry.getAuthenticationManager()); - _conn.setConnectionEventListener(new Connection_1_0(_appRegistry)); - _conn.setFrameOutputHandler(this); - _conn.setRemoteAddress(driver.getRemoteAddress()); - - - _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry()); - _frameHandler = new FrameHandler(_conn); - - _networkDriver.getSender().send(HEADER.duplicate()); - - } public SocketAddress getRemoteAddress() { - return _networkDriver.getRemoteAddress(); + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _networkDriver.getLocalAddress(); + return _network.getLocalAddress(); } public long getReadBytes() @@ -166,6 +130,25 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa //Todo } + public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) + { + _network = network; + _sender = sender; + + Container container = new Container(); + + _conn = new ConnectionEndpoint(container,_appRegistry.getAuthenticationManager()); + _conn.setConnectionEventListener(new Connection_1_0(_appRegistry)); + _conn.setFrameOutputHandler(this); + _conn.setRemoteAddress(_network.getRemoteAddress()); + + _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry()); + _frameHandler = new FrameHandler(_conn); + + _sender.send(HEADER.duplicate()); + _sender.flush(); + } + public String getAddress() { return getRemoteAddress().toString(); @@ -338,26 +321,19 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody()); } - if(_buf.remaining() < _conn.getMaxFrameSize()) - { - _buf = ByteBuffer.allocate(Math.min(_conn.getMaxFrameSize(),1024*1024)); - } + _frameWriter.setValue(amqFrame); ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize()); - // int pos = _buf.position(); int size = _frameWriter.writeToBuffer(dup); if(size > _conn.getMaxFrameSize()) { -// _buf.position(pos); throw new OversizeFrameException(amqFrame,size); } -// _buf.position(_buf.position()+dup.position()); - dup.flip(); _writtenBytes += dup.limit(); @@ -371,7 +347,8 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa } - _networkDriver.getSender().send(dup); + _sender.send(dup); + _sender.flush(); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java index c8041f35bb..a735c5bef3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.configuration.ConnectionConfigType; import org.apache.qpid.server.protocol.v1_0.Connection_1_0; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; import java.io.PrintWriter; @@ -50,7 +51,6 @@ import java.util.logging.Logger; public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOutputHandler { - private NetworkConnection _networkDriver; private long _readBytes; private long _writtenBytes; private final UUID _id; @@ -59,7 +59,6 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private ConnectionEndpoint _conn; private long _connectionId = ProtocolEngine_1_0_0._connectionIdSource.getAndIncrement(); - private static final int BUF_SIZE = 8; private static final ByteBuffer HEADER = ByteBuffer.wrap(new byte[] { @@ -95,6 +94,8 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private byte _minor; private byte _revision; private PrintWriter _out; + private NetworkConnection _network; + private Sender<ByteBuffer> _sender; static enum State { @@ -116,23 +117,46 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut { _id = appRegistry.getConfigStore().createId(); _appRegistry = appRegistry; - setNetworkDriver(networkDriver); } - public void setNetworkDriver(final NetworkConnection driver) + + public SocketAddress getRemoteAddress() { - _networkDriver = driver; - Container container = new Container(); + return _network.getRemoteAddress(); + } - Principal principal = new Principal() - { + public SocketAddress getLocalAddress() + { + return _network.getLocalAddress(); + } + + public long getReadBytes() + { + return _readBytes; + } + + public long getWrittenBytes() + { + return _writtenBytes; + } + + public void writerIdle() + { + //Todo + } + + public void readerIdle() + { + //Todo + } + + public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) + { + _network = network; + _sender = sender; + + Container container = new Container(); - public String getName() - { - // TODO - return "rob"; - } - }; _conn = new ConnectionEndpoint(container, ApplicationRegistry.getInstance().getAuthenticationManager()); _conn.setConnectionEventListener(new Connection_1_0(_appRegistry)); _conn.setRemoteAddress(getRemoteAddress()); @@ -149,51 +173,24 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut { if(_conn.isAuthenticated()) { - _networkDriver.getSender().send(PROTOCOL_HEADER.duplicate()); + _sender.send(PROTOCOL_HEADER.duplicate()); + _sender.flush(); } else { - _networkDriver.close(); + _network.close(); } } }); _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry()); _frameHandler = new SASLFrameHandler(_conn); - _networkDriver.getSender().send(HEADER.duplicate()); + _sender.send(HEADER.duplicate()); + _sender.flush(); _conn.initiateSASL(); - } - public SocketAddress getRemoteAddress() - { - return _networkDriver.getRemoteAddress(); - } - - public SocketAddress getLocalAddress() - { - return _networkDriver.getLocalAddress(); - } - - public long getReadBytes() - { - return _readBytes; - } - - public long getWrittenBytes() - { - return _writtenBytes; - } - - public void writerIdle() - { - //Todo - } - - public void readerIdle() - { - //Todo } public String getAddress() @@ -366,41 +363,41 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut synchronized(_sendLock) { - if(FRAME_LOGGER.isLoggable(Level.FINE)) - { - FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody()); - } - if(_buf.remaining() < _conn.getMaxFrameSize()) + + if(FRAME_LOGGER.isLoggable(Level.FINE)) { - _buf = ByteBuffer.allocate(Math.min(_conn.getMaxFrameSize(),1024*1024)); + FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody()); } + + _frameWriter.setValue(amqFrame); - ByteBuffer dup = _buf.slice(); - int pos = _buf.position(); + + + ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize()); int size = _frameWriter.writeToBuffer(dup); if(size > _conn.getMaxFrameSize()) { - _buf.position(pos); throw new OversizeFrameException(amqFrame,size); } - _buf.position(_buf.position()+dup.position()); - dup.flip(); _writtenBytes += dup.limit(); if(RAW_LOGGER.isLoggable(Level.FINE)) - { - ByteBuffer dup2 = dup.duplicate(); - byte[] data = new byte[dup2.remaining()]; - dup2.get(data); - Binary bin = new Binary(data); - RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString()); - } + { + ByteBuffer dup2 = dup.duplicate(); + byte[] data = new byte[dup2.remaining()]; + dup2.get(data); + Binary bin = new Binary(data); + RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString()); + } + + + _sender.send(dup); + _sender.flush(); - _networkDriver.getSender().send(dup); } } |