summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-09-10 14:45:22 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-09-10 14:45:22 +0000
commit9c36740bc0cbede0ce533f137fa048363c4461da (patch)
treef83f6477344458fb72c30f2891eeb769d35b1566
parent5cf96f160dd6c390ed4f40927766d61d7120f6c0 (diff)
downloadqpid-python-9c36740bc0cbede0ce533f137fa048363c4461da.tar.gz
NO-JIRA : Fix 1-0 sandox breakage caused by merge
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1167527 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPSASLEngine_1_0_0.java339
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java2
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java81
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java127
4 files changed, 92 insertions, 457 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPSASLEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPSASLEngine_1_0_0.java
deleted file mode 100644
index ebd93b1185..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPSASLEngine_1_0_0.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import org.apache.qpid.amqp_1_0.codec.FrameWriter;
-import org.apache.qpid.amqp_1_0.framing.AMQFrame;
-import org.apache.qpid.amqp_1_0.framing.FrameHandler;
-import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import org.apache.qpid.amqp_1_0.transport.Container;
-import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
-import org.apache.qpid.amqp_1_0.type.FrameBody;
-
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConnectionConfigType;
-import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.transport.network.NetworkConnection;
-
-import java.io.PrintWriter;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.UUID;
-
-public class AMQPSASLEngine_1_0_0 implements ProtocolEngine, FrameOutputHandler
-{
- public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
-
- private NetworkConnection _networkDriver;
- private long _readBytes;
- private long _writtenBytes;
- private final UUID _id;
- private final IApplicationRegistry _appRegistry;
- private long _createTime = System.currentTimeMillis();
-
- private static final int BUF_SIZE = 8;
- private static final ByteBuffer HEADER =
- ByteBuffer.wrap(new byte[]
- {
- (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte) 0,
- (byte) 1,
- (byte) 0,
- (byte) 0
- });
-
- private FrameWriter _frameWriter;
- private FrameHandler _frameHandler;
- private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024);
- private Object _sendLock = new Object();
- private byte _major;
- private byte _minor;
- private byte _revision;
- private ConnectionEndpoint _conn;
-
-
- static enum State {
- A,
- M,
- Q,
- P,
- PROTOCOL,
- MAJOR,
- MINOR,
- REVISION,
- FRAME
- }
-
- private State _state = State.A;
-
- public AMQPSASLEngine_1_0_0(NetworkConnection networkDriver,
- final IApplicationRegistry appRegistry)
- {
-
-
- _id = appRegistry.getConfigStore().createId();
- _appRegistry = appRegistry;
- setNetworkDriver(networkDriver);
-
-
- // FIXME Two log messages to maintain compatinbility with earlier protocol versions
-// _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
-// _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
- }
-
- public void setNetworkDriver(NetworkConnection driver)
- {
- _networkDriver = driver;
-
-
- _conn = new ConnectionEndpoint(new Container(),_appRegistry.getAuthenticationManager());
- _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
- _conn.setFrameOutputHandler(this);
-
-
- _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry());
- _frameHandler = new FrameHandler(_conn);
-
- _networkDriver.getSender().send(HEADER.duplicate());
-
-
-
- }
-
- public SocketAddress getRemoteAddress()
- {
- return _networkDriver.getRemoteAddress();
- }
-
- public SocketAddress getLocalAddress()
- {
- return _networkDriver.getLocalAddress();
- }
-
- public long getReadBytes()
- {
- return _readBytes;
- }
-
- public long getWrittenBytes()
- {
- return _writtenBytes;
- }
-
- public void writerIdle()
- {
- //Todo
- }
-
- public void readerIdle()
- {
- //Todo
- }
-
- public String getAddress()
- {
- return getRemoteAddress().toString();
- }
-
-
- public ConfigStore getConfigStore()
- {
- return _appRegistry.getConfigStore();
- }
-
- public UUID getId()
- {
- return _id;
- }
-
- public ConnectionConfigType getConfigType()
- {
- return ConnectionConfigType.getInstance();
- }
-
- public boolean isDurable()
- {
- return false;
- }
-
- public synchronized void received(ByteBuffer msg)
- {
- _readBytes += msg.remaining();
- switch(_state)
- {
- case A:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- break;
- }
- case M:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.M;
- break;
- }
-
- case Q:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.Q;
- break;
- }
- case P:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.P;
- break;
- }
- case PROTOCOL:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.PROTOCOL;
- break;
- }
- case MAJOR:
- if(msg.hasRemaining())
- {
- _major = msg.get();
- }
- else
- {
- _state = State.MAJOR;
- break;
- }
- case MINOR:
- if(msg.hasRemaining())
- {
- _minor = msg.get();
- }
- else
- {
- _state = State.MINOR;
- break;
- }
- case REVISION:
- if(msg.hasRemaining())
- {
- _revision = msg.get();
-
- _state = State.FRAME;
- }
- else
- {
- _state = State.REVISION;
- break;
- }
- case FRAME:
- _frameHandler.parse(msg);
- }
-
- }
-
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
-
- public void closed()
- {
- // todo
-
- }
-
- public long getCreateTime()
- {
- return _createTime;
- }
-
-
- public boolean canSend()
- {
- return true;
- }
-
- public void send(AMQFrame frame)
- {
- send(frame, null);
- }
- public void send(AMQFrame frame, ByteBuffer buffer)
- {
-
- synchronized(_sendLock)
- {
-
- if(_buf.remaining() < MAX_FRAME_SIZE)
- {
- _buf = ByteBuffer.allocate(1024*1024);
- }
-
- _frameWriter.setValue(frame);
-
- ByteBuffer dup = _buf.slice();
-
- _frameWriter.writeToBuffer(dup);
-
- _buf.position(_buf.position()+dup.position());
-
- dup.flip();
- _writtenBytes += dup.limit();
- _networkDriver.getSender().send(dup);
-
- }
- }
-
- public void close()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setLogOutput(final PrintWriter out)
- {
- //TODO
- }
-
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 3852253058..a71d396919 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -311,7 +311,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
public ServerProtocolEngine getProtocolEngine()
{
- return new ProtocolEngine_1_0_0(_network, _appRegistry);
+ return new ProtocolEngine_1_0_0(_appRegistry);
}
};
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
index 18b1b71d69..6c07e4912f 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
@@ -32,6 +32,7 @@ import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.configuration.*;
import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
import java.io.PrintWriter;
@@ -46,9 +47,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
{
static final AtomicLong _connectionIdSource = new AtomicLong(0L);
- public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
-
- private NetworkConnection _networkDriver;
+ //private NetworkConnection _networkDriver;
private long _readBytes;
private long _writtenBytes;
private final UUID _id;
@@ -57,7 +56,6 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
private ConnectionEndpoint _conn;
private final long _connectionId = _connectionIdSource.getAndIncrement();
- private static final int BUF_SIZE = 8;
private static final ByteBuffer HEADER =
ByteBuffer.wrap(new byte[]
{
@@ -73,11 +71,12 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
private FrameWriter _frameWriter;
private FrameHandler _frameHandler;
- private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024);
private Object _sendLock = new Object();
private byte _major;
private byte _minor;
private byte _revision;
+ private NetworkConnection _network;
+ private Sender<ByteBuffer> _sender;
static enum State {
@@ -94,56 +93,21 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
private State _state = State.A;
- public ProtocolEngine_1_0_0(NetworkConnection networkDriver,
- final IApplicationRegistry appRegistry)
+ public ProtocolEngine_1_0_0(final IApplicationRegistry appRegistry)
{
-
-
_id = appRegistry.getConfigStore().createId();
_appRegistry = appRegistry;
- setNetworkDriver(networkDriver);
-
-
- // FIXME Two log messages to maintain compatinbility with earlier protocol versions
-// _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
-// _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
}
- public void setNetworkDriver(NetworkConnection driver)
- {
- _networkDriver = driver;
- Container container = new Container();
-
- Principal principal = new Principal()
- {
-
- public String getName()
- {
- // TODO
- return "rob";
- }
- };
- _conn = new ConnectionEndpoint(container,_appRegistry.getAuthenticationManager());
- _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
- _conn.setFrameOutputHandler(this);
- _conn.setRemoteAddress(driver.getRemoteAddress());
-
-
- _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry());
- _frameHandler = new FrameHandler(_conn);
-
- _networkDriver.getSender().send(HEADER.duplicate());
-
- }
public SocketAddress getRemoteAddress()
{
- return _networkDriver.getRemoteAddress();
+ return _network.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _networkDriver.getLocalAddress();
+ return _network.getLocalAddress();
}
public long getReadBytes()
@@ -166,6 +130,25 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
//Todo
}
+ public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
+ {
+ _network = network;
+ _sender = sender;
+
+ Container container = new Container();
+
+ _conn = new ConnectionEndpoint(container,_appRegistry.getAuthenticationManager());
+ _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
+ _conn.setFrameOutputHandler(this);
+ _conn.setRemoteAddress(_network.getRemoteAddress());
+
+ _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry());
+ _frameHandler = new FrameHandler(_conn);
+
+ _sender.send(HEADER.duplicate());
+ _sender.flush();
+ }
+
public String getAddress()
{
return getRemoteAddress().toString();
@@ -338,26 +321,19 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
}
- if(_buf.remaining() < _conn.getMaxFrameSize())
- {
- _buf = ByteBuffer.allocate(Math.min(_conn.getMaxFrameSize(),1024*1024));
- }
+
_frameWriter.setValue(amqFrame);
ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize());
- // int pos = _buf.position();
int size = _frameWriter.writeToBuffer(dup);
if(size > _conn.getMaxFrameSize())
{
-// _buf.position(pos);
throw new OversizeFrameException(amqFrame,size);
}
-// _buf.position(_buf.position()+dup.position());
-
dup.flip();
_writtenBytes += dup.limit();
@@ -371,7 +347,8 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
}
- _networkDriver.getSender().send(dup);
+ _sender.send(dup);
+ _sender.flush();
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
index c8041f35bb..a735c5bef3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.configuration.ConnectionConfigType;
import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
import java.io.PrintWriter;
@@ -50,7 +51,6 @@ import java.util.logging.Logger;
public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOutputHandler
{
- private NetworkConnection _networkDriver;
private long _readBytes;
private long _writtenBytes;
private final UUID _id;
@@ -59,7 +59,6 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private ConnectionEndpoint _conn;
private long _connectionId = ProtocolEngine_1_0_0._connectionIdSource.getAndIncrement();
- private static final int BUF_SIZE = 8;
private static final ByteBuffer HEADER =
ByteBuffer.wrap(new byte[]
{
@@ -95,6 +94,8 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private byte _minor;
private byte _revision;
private PrintWriter _out;
+ private NetworkConnection _network;
+ private Sender<ByteBuffer> _sender;
static enum State {
@@ -116,23 +117,46 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
{
_id = appRegistry.getConfigStore().createId();
_appRegistry = appRegistry;
- setNetworkDriver(networkDriver);
}
- public void setNetworkDriver(final NetworkConnection driver)
+
+ public SocketAddress getRemoteAddress()
{
- _networkDriver = driver;
- Container container = new Container();
+ return _network.getRemoteAddress();
+ }
- Principal principal = new Principal()
- {
+ public SocketAddress getLocalAddress()
+ {
+ return _network.getLocalAddress();
+ }
+
+ public long getReadBytes()
+ {
+ return _readBytes;
+ }
+
+ public long getWrittenBytes()
+ {
+ return _writtenBytes;
+ }
+
+ public void writerIdle()
+ {
+ //Todo
+ }
+
+ public void readerIdle()
+ {
+ //Todo
+ }
+
+ public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
+ {
+ _network = network;
+ _sender = sender;
+
+ Container container = new Container();
- public String getName()
- {
- // TODO
- return "rob";
- }
- };
_conn = new ConnectionEndpoint(container, ApplicationRegistry.getInstance().getAuthenticationManager());
_conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
_conn.setRemoteAddress(getRemoteAddress());
@@ -149,51 +173,24 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
{
if(_conn.isAuthenticated())
{
- _networkDriver.getSender().send(PROTOCOL_HEADER.duplicate());
+ _sender.send(PROTOCOL_HEADER.duplicate());
+ _sender.flush();
}
else
{
- _networkDriver.close();
+ _network.close();
}
}
});
_frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry());
_frameHandler = new SASLFrameHandler(_conn);
- _networkDriver.getSender().send(HEADER.duplicate());
+ _sender.send(HEADER.duplicate());
+ _sender.flush();
_conn.initiateSASL();
- }
- public SocketAddress getRemoteAddress()
- {
- return _networkDriver.getRemoteAddress();
- }
-
- public SocketAddress getLocalAddress()
- {
- return _networkDriver.getLocalAddress();
- }
-
- public long getReadBytes()
- {
- return _readBytes;
- }
-
- public long getWrittenBytes()
- {
- return _writtenBytes;
- }
-
- public void writerIdle()
- {
- //Todo
- }
-
- public void readerIdle()
- {
- //Todo
}
public String getAddress()
@@ -366,41 +363,41 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
synchronized(_sendLock)
{
- if(FRAME_LOGGER.isLoggable(Level.FINE))
- {
- FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
- }
- if(_buf.remaining() < _conn.getMaxFrameSize())
+
+ if(FRAME_LOGGER.isLoggable(Level.FINE))
{
- _buf = ByteBuffer.allocate(Math.min(_conn.getMaxFrameSize(),1024*1024));
+ FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
}
+
+
_frameWriter.setValue(amqFrame);
- ByteBuffer dup = _buf.slice();
- int pos = _buf.position();
+
+
+ ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize());
int size = _frameWriter.writeToBuffer(dup);
if(size > _conn.getMaxFrameSize())
{
- _buf.position(pos);
throw new OversizeFrameException(amqFrame,size);
}
- _buf.position(_buf.position()+dup.position());
-
dup.flip();
_writtenBytes += dup.limit();
if(RAW_LOGGER.isLoggable(Level.FINE))
- {
- ByteBuffer dup2 = dup.duplicate();
- byte[] data = new byte[dup2.remaining()];
- dup2.get(data);
- Binary bin = new Binary(data);
- RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString());
- }
+ {
+ ByteBuffer dup2 = dup.duplicate();
+ byte[] data = new byte[dup2.remaining()];
+ dup2.get(data);
+ Binary bin = new Binary(data);
+ RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString());
+ }
+
+
+ _sender.send(dup);
+ _sender.flush();
- _networkDriver.getSender().send(dup);
}
}