summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-09-13 21:42:57 +0000
committerRafael H. Schloming <rhs@apache.org>2007-09-13 21:42:57 +0000
commite10d11937bccc3cdbdd867266501c3e16d28e933 (patch)
treeee31690915cbb880ba553708ed11b9b607b23a0b
parent0a1b3430450f274aee273a9f792a2d43f771b85f (diff)
downloadqpid-python-e10d11937bccc3cdbdd867266501c3e16d28e933.tar.gz
* moved most of the classes in the org.apache.qpidity package to
org.apache.qpidity.transport * factored out the network specific pieces into org.apache.qpidity.transport * moved the mina specific code to org.apache.qpidity.transport.network.mina * replaced the handler chain with Sender/Receiver chains that can deal with close request/closed notifications * moved from an anonymous struct[] to a real Header class * removed an excess copy from message data transmit git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@575474 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Client.java53
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/DtxSession.java18
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Session.java8
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java12
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java33
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java10
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java50
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java2
-rwxr-xr-xjava/common/generate5
-rw-r--r--java/common/pom.xml22
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Channel.java220
-rw-r--r--java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java53
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Connection.java141
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java13
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ContentHandler.java45
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Event.java44
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Handler.java35
-rw-r--r--java/common/src/main/java/org/apache/qpidity/HeaderHandler.java66
-rw-r--r--java/common/src/main/java/org/apache/qpidity/MethodDecoder.java62
-rw-r--r--java/common/src/main/java/org/apache/qpidity/MethodHandler.java42
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java65
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SessionResolver.java47
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Stub.java109
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyBroker.java104
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyClient.java21
-rw-r--r--java/common/src/main/java/org/apache/qpidity/TrackSwitch.java39
-rw-r--r--java/common/src/main/java/org/apache/qpidity/TypeSwitch.java39
-rw-r--r--java/common/src/main/java/org/apache/qpidity/api/Message.java49
-rw-r--r--java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java6
-rw-r--r--java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java8
-rw-r--r--java/common/src/main/java/org/apache/qpidity/codec/Decoder.java4
-rw-r--r--java/common/src/main/java/org/apache/qpidity/codec/Encoder.java4
-rw-r--r--java/common/src/main/java/org/apache/qpidity/codec/SegmentEncoder.java175
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/AbstractDelegate.java (renamed from java/common/src/main/java/org/apache/qpidity/ProtocolActions.java)15
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Channel.java182
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java (renamed from java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Connection.java104
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java (renamed from java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java)112
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java (renamed from java/common/src/main/java/org/apache/qpidity/ProtocolError.java)26
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Data.java84
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Future.java (renamed from java/common/src/main/java/org/apache/qpidity/Future.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Header.java (renamed from java/common/src/main/java/org/apache/qpidity/Switch.java)56
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Method.java (renamed from java/common/src/main/java/org/apache/qpidity/Method.java)10
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java (renamed from java/common/src/main/java/org/apache/qpidity/Segment.java)54
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java (renamed from java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java)30
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java (renamed from java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java)33
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Range.java (renamed from java/common/src/main/java/org/apache/qpidity/Range.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java (renamed from java/common/src/main/java/org/apache/qpidity/RangeSet.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Receiver.java (renamed from java/common/src/main/java/org/apache/qpidity/Delegator.java)11
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Result.java (renamed from java/common/src/main/java/org/apache/qpidity/Result.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Sender.java (renamed from java/common/src/main/java/org/apache/qpidity/Header.java)14
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Session.java (renamed from java/common/src/main/java/org/apache/qpidity/Session.java)48
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java (renamed from java/common/src/main/java/org/apache/qpidity/SessionDelegate.java)8
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Struct.java (renamed from java/common/src/main/java/org/apache/qpidity/Struct.java)4
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java184
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java168
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java (renamed from java/common/src/main/java/org/apache/qpidity/Frame.java)18
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java (renamed from java/common/src/main/java/org/apache/qpidity/InputHandler.java)33
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java (renamed from java/common/src/main/java/org/apache/qpidity/ProtocolHandler.java)9
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java34
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java99
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java (renamed from java/common/src/main/java/org/apache/qpidity/MinaHandler.java)66
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java (renamed from java/common/src/main/java/org/apache/qpidity/BodyHandler.java)28
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java (renamed from java/common/src/main/java/org/apache/qpidity/Functions.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java (renamed from java/common/src/main/java/org/apache/qpidity/SliceIterator.java)4
-rw-r--r--java/plugins/src/main/java/org/apache/qpid/plugins/JythonMojo.java36
84 files changed, 1511 insertions, 1639 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index aa6756d116..aaa724fd93 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -27,8 +27,8 @@ import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpidity.client.Session;
import org.apache.qpidity.client.util.MessagePartListenerAdapter;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.Option;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.transport.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index ec27fdbb71..3bd5856df5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -28,7 +28,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
import org.apache.qpidity.api.Message;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.Struct;
import javax.jms.JMSException;
import java.io.IOException;
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 8ecb5ffd78..f3fa79eb51 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -26,7 +26,7 @@ import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpidity.jms.ExceptionHelper;
import org.apache.qpidity.client.util.ByteBufferMessage;
-import org.apache.qpidity.ReplyTo;
+import org.apache.qpidity.transport.ReplyTo;
import javax.jms.Message;
import javax.jms.JMSException;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index b115086d71..f5603b1695 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -27,9 +27,9 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpidity.Struct;
-import org.apache.qpidity.MessageProperties;
-import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.transport.Struct;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.transport.DeliveryProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
index 5c1ee713fc..6198f0504e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
@@ -27,7 +27,7 @@ import javax.jms.JMSException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.Struct;
public interface MessageFactory
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index b60fc26fc0..13a2202e6f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -30,9 +30,9 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpidity.Struct;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.transport.Struct;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
index 8d78f9f7fd..970ba5a66a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
@@ -25,8 +25,8 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.Struct;
/**
* This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Client.java b/java/client/src/main/java/org/apache/qpidity/client/Client.java
index fc89f2d368..3bc684a6ca 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Client.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Client.java
@@ -6,16 +6,18 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.qpidity.BrokerDetails;
-import org.apache.qpidity.Channel;
-import org.apache.qpidity.Connection;
-import org.apache.qpidity.ConnectionClose;
-import org.apache.qpidity.ConnectionDelegate;
import org.apache.qpidity.ErrorCode;
-import org.apache.qpidity.MinaHandler;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.SessionDelegate;
import org.apache.qpidity.client.impl.ClientSession;
import org.apache.qpidity.client.impl.ClientSessionDelegate;
+import org.apache.qpidity.transport.Channel;
+import org.apache.qpidity.transport.Connection;
+import org.apache.qpidity.transport.ConnectionClose;
+import org.apache.qpidity.transport.ConnectionDelegate;
+import org.apache.qpidity.transport.ConnectionEvent;
+import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.SessionDelegate;
+import org.apache.qpidity.transport.network.mina.MinaHandler;
import org.apache.qpidity.url.QpidURL;
@@ -25,47 +27,48 @@ public class Client implements org.apache.qpidity.client.Connection
private Connection _conn;
private ExceptionListener _exceptionListner;
private final Lock _lock = new ReentrantLock();
-
+
/**
- *
+ *
* @return returns a new connection to the broker.
*/
public static org.apache.qpidity.client.Connection createConnection()
{
return new Client();
}
-
+
public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException
{
Condition negotiationComplete = _lock.newCondition();
_lock.lock();
-
+
ConnectionDelegate connectionDelegate = new ConnectionDelegate()
- {
+ {
public SessionDelegate getSessionDelegate()
{
return new ClientSessionDelegate();
}
-
- @Override public void connectionClose(Channel context, ConnectionClose connectionClose)
+
+ @Override public void connectionClose(Channel context, ConnectionClose connectionClose)
{
_exceptionListner.onException(
- new QpidException("Server closed the connection: Reason " +
+ new QpidException("Server closed the connection: Reason " +
connectionClose.getReplyText(),
ErrorCode.get(connectionClose.getReplyCode()),
null));
}
};
-
+
connectionDelegate.setCondition(_lock,negotiationComplete);
connectionDelegate.setUsername(username);
connectionDelegate.setPassword(password);
connectionDelegate.setVirtualHost(virtualHost);
-
+
_conn = MinaHandler.connect(host, port,connectionDelegate);
-
- _conn.getOutputHandler().handle(_conn.getHeader().toByteBuffer());
-
+
+ // XXX: hardcoded version numbers
+ _conn.send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10)));
+
try
{
negotiationComplete.await();
@@ -79,7 +82,7 @@ public class Client implements org.apache.qpidity.client.Connection
_lock.unlock();
}
}
-
+
/*
* Until the dust settles with the URL disucssion
* I am not going to implement this.
@@ -94,9 +97,9 @@ public class Client implements org.apache.qpidity.client.Connection
details.getUserName(),
details.getPassword());
}
-
+
public void close() throws QpidException
- {
+ {
Channel ch = _conn.getChannel(0);
ch.connectionClose(0, "client is closing", 0, 0);
//need to close the connection underneath as well
@@ -104,7 +107,7 @@ public class Client implements org.apache.qpidity.client.Connection
public Session createSession(long expiryInSeconds)
{
- Channel ch = _conn.getChannel(_channelNo.incrementAndGet());
+ Channel ch = _conn.getChannel(_channelNo.incrementAndGet());
ClientSession ssn = new ClientSession();
ssn.attach(ch);
ssn.sessionOpen(expiryInSeconds);
@@ -116,10 +119,10 @@ public class Client implements org.apache.qpidity.client.Connection
// TODO Auto-generated method stub
return null;
}
-
+
public void setExceptionListener(ExceptionListener exceptionListner)
{
- _exceptionListner = exceptionListner;
+ _exceptionListner = exceptionListner;
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java b/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java
index 21532f7d46..bf6433af6a 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java
@@ -18,15 +18,15 @@
*/
package org.apache.qpidity.client;
-import org.apache.qpidity.DtxCoordinationCommitResult;
-import org.apache.qpidity.DtxCoordinationGetTimeoutResult;
-import org.apache.qpidity.DtxCoordinationPrepareResult;
-import org.apache.qpidity.DtxCoordinationRecoverResult;
-import org.apache.qpidity.DtxCoordinationRollbackResult;
-import org.apache.qpidity.DtxDemarcationEndResult;
-import org.apache.qpidity.DtxDemarcationStartResult;
-import org.apache.qpidity.Future;
-import org.apache.qpidity.Option;
+import org.apache.qpidity.transport.DtxCoordinationCommitResult;
+import org.apache.qpidity.transport.DtxCoordinationGetTimeoutResult;
+import org.apache.qpidity.transport.DtxCoordinationPrepareResult;
+import org.apache.qpidity.transport.DtxCoordinationRecoverResult;
+import org.apache.qpidity.transport.DtxCoordinationRollbackResult;
+import org.apache.qpidity.transport.DtxDemarcationEndResult;
+import org.apache.qpidity.transport.DtxDemarcationStartResult;
+import org.apache.qpidity.transport.Future;
+import org.apache.qpidity.transport.Option;
/**
* This session�s resources are control under the scope of a distributed transaction.
diff --git a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
index 4ccef6df55..f0c2c8ca9c 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
@@ -19,7 +19,7 @@ package org.apache.qpidity.client;
import java.nio.ByteBuffer;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.Header;
/**
* Assembles message parts.
@@ -47,7 +47,7 @@ public interface MessagePartListener
*
* @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
*/
- public void messageHeaders(Struct... headers);
+ public void messageHeader(Header header);
/**
* Add the following byte array to the content of the message being received
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java
index d8be937e46..2340d27882 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Session.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java
@@ -22,9 +22,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
-import org.apache.qpidity.Option;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.transport.Struct;
import org.apache.qpidity.api.Message;
/**
@@ -186,7 +186,7 @@ public interface Session
* @see org.apache.qpidity.DeliveryProperties
* @see org.apache.qpidity.MessageProperties
*/
- public void headers(Struct... headers);
+ public void header(Struct... headers);
/**
* Add the following byte array to the content of the message being sent.
diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
index ea225976f2..2e53bdfcad 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
@@ -5,10 +5,10 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import org.apache.qpidity.Option;
+import org.apache.qpidity.transport.Option;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Range;
-import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.transport.Range;
+import org.apache.qpidity.transport.RangeSet;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.ExceptionListener;
import org.apache.qpidity.client.MessagePartListener;
@@ -16,7 +16,7 @@ import org.apache.qpidity.client.MessagePartListener;
/**
* Implements a Qpid Sesion.
*/
-public class ClientSession extends org.apache.qpidity.Session implements org.apache.qpidity.client.Session
+public class ClientSession extends org.apache.qpidity.transport.Session implements org.apache.qpidity.client.Session
{
private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>();
private ExceptionListener _exceptionListner;
@@ -46,7 +46,7 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa
// The javadoc clearly says that this method is suitable for small messages
// therefore reading the content in one shot.
super.messageTransfer(destination, confirmMode, acquireMode);
- super.headers(msg.getDeliveryProperties(),msg.getMessageProperties());
+ super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
super.data(msg.readData());
super.endData();
}
@@ -54,7 +54,7 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa
public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException
{
super.messageTransfer(destination, confirmMode, acquireMode);
- super.headers(msg.getDeliveryProperties(),msg.getMessageProperties());
+ super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
boolean b = true;
int count = 0;
while(b)
diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
index c4565d4544..1e003b23e6 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
@@ -3,18 +3,21 @@ package org.apache.qpidity.client.impl;
import java.nio.ByteBuffer;
import org.apache.qpidity.ErrorCode;
-import org.apache.qpidity.Frame;
-import org.apache.qpidity.MessageAcquired;
-import org.apache.qpidity.MessageReject;
-import org.apache.qpidity.MessageTransfer;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Range;
-import org.apache.qpidity.Session;
-import org.apache.qpidity.SessionClosed;
-import org.apache.qpidity.SessionDelegate;
-import org.apache.qpidity.Struct;
+
import org.apache.qpidity.client.MessagePartListener;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.transport.Data;
+import org.apache.qpidity.transport.Header;
+import org.apache.qpidity.transport.MessageAcquired;
+import org.apache.qpidity.transport.MessageReject;
+import org.apache.qpidity.transport.MessageTransfer;
+import org.apache.qpidity.transport.Range;
+import org.apache.qpidity.transport.Session;
+import org.apache.qpidity.transport.SessionClosed;
+import org.apache.qpidity.transport.SessionDelegate;
+import org.apache.qpidity.transport.Struct;
+
public class ClientSessionDelegate extends SessionDelegate
{
@@ -29,22 +32,22 @@ public class ClientSessionDelegate extends SessionDelegate
// --------------------------------------------
// Message methods
// --------------------------------------------
- @Override public void data(Session ssn, Frame frame)
+ @Override public void data(Session ssn, Data data)
{
- for (ByteBuffer b : frame)
+ for (ByteBuffer b : data.getFragments())
{
_currentMessageListener.data(b);
}
- if (frame.isLastSegment() && frame.isLastFrame())
+ if (data.isLast())
{
_currentMessageListener.messageReceived();
}
}
- @Override public void headers(Session ssn, Struct... headers)
+ @Override public void header(Session ssn, Header header)
{
- _currentMessageListener.messageHeaders(headers);
+ _currentMessageListener.messageHeader(header);
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java
index 1f8e9610d1..c15ac8f6e5 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java
@@ -1,7 +1,5 @@
package org.apache.qpidity.client.impl;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.Client;
@@ -10,6 +8,8 @@ import org.apache.qpidity.client.ExceptionListener;
import org.apache.qpidity.client.Session;
import org.apache.qpidity.client.util.MessageListener;
import org.apache.qpidity.client.util.MessagePartListenerAdapter;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
public class DemoClient
{
@@ -53,14 +53,14 @@ public class DemoClient
// queue
ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
- ssn.headers(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123"));
+ ssn.header(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123"));
ssn.data("this is the data");
ssn.endData();
//reject
ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
ssn.data("this should be rejected");
- ssn.headers(new DeliveryProperties().setRoutingKey("stocks"));
+ ssn.header(new DeliveryProperties().setRoutingKey("stocks"));
ssn.endData();
ssn.sync();
@@ -81,7 +81,7 @@ public class DemoClient
// topic
ssn.messageTransfer("amq.topic", (short) 0, (short) 1);
ssn.data("Topic message");
- ssn.headers(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456"));
+ ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456"));
ssn.endData();
ssn.sync();
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java
index acf5c283b2..4e90a82fff 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java
@@ -2,8 +2,6 @@ package org.apache.qpidity.client.impl;
import java.io.FileInputStream;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.Client;
@@ -13,6 +11,8 @@ import org.apache.qpidity.client.Session;
import org.apache.qpidity.client.util.FileMessage;
import org.apache.qpidity.client.util.MessageListener;
import org.apache.qpidity.client.util.MessagePartListenerAdapter;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
public class LargeMsgDemoClient
{
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
index 218c6cd018..bf0889a913 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
@@ -5,8 +5,8 @@ import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Queue;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.api.Message;
/**
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
index 73e71ed84d..34fc057724 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
@@ -7,8 +7,8 @@ import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.api.Message;
/**
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
index e4f19ea6c3..c40a85863d 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
@@ -3,16 +3,16 @@ package org.apache.qpidity.client.util;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.transport.Header;
import org.apache.qpidity.client.MessagePartListener;
-/**
+/**
* This is a simple message assembler.
- * Will call onMessage method of the adaptee
+ * Will call onMessage method of the adaptee
* when all message data is read.
- *
+ *
* This is a good convinience utility for handling
* small messages
*/
@@ -20,17 +20,17 @@ public class MessagePartListenerAdapter implements MessagePartListener
{
MessageListener _adaptee;
ByteBufferMessage _currentMsg;
-
+
public MessagePartListenerAdapter(MessageListener listener)
{
- _adaptee = listener;
+ _adaptee = listener;
}
-
+
public void messageTransfer(long transferId)
{
_currentMsg = new ByteBufferMessage(transferId);
}
-
+
public void data(ByteBuffer src)
{
try
@@ -40,28 +40,20 @@ public class MessagePartListenerAdapter implements MessagePartListener
catch(IOException e)
{
// A chance for IO exception
- // doesn't occur as we are using
+ // doesn't occur as we are using
// a ByteBuffer
}
}
- public void messageHeaders(Struct... headers)
- {
- for(Struct struct: headers)
- {
- if(struct instanceof DeliveryProperties)
- {
- _currentMsg.setDeliveryProperties((DeliveryProperties)struct);
- }
- else if (struct instanceof MessageProperties)
- {
- _currentMsg.setMessageProperties((MessageProperties)struct);
- }
- }
- }
-
- public void messageReceived()
- {
+ public void messageHeader(Header header)
+ {
+ _currentMsg.setDeliveryProperties(header.get(DeliveryProperties.class));
+ _currentMsg.setMessageProperties(header.get(MessageProperties.class));
+ }
+
+ public void messageReceived()
+ {
_adaptee.onMessage(_currentMsg);
- }
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
index 2527856798..dfb65214d2 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
@@ -2,8 +2,8 @@ package org.apache.qpidity.client.util;
import java.nio.ByteBuffer;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.api.Message;
public abstract class ReadOnlyMessage implements Message
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
index 23089c7931..37e726d6c6 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
@@ -5,8 +5,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.api.Message;
public class StreamingMessage extends ReadOnlyMessage implements Message
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index a3c03ca6d0..8b6e694d25 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -26,9 +26,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
-import org.apache.qpidity.Option;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.RangeSet;
import org.apache.qpidity.client.MessagePartListener;
import org.apache.qpidity.client.util.MessagePartListenerAdapter;
import org.apache.qpidity.exchange.ExchangeDefaults;
@@ -36,6 +34,8 @@ import org.apache.qpidity.filter.JMSSelectorFilter;
import org.apache.qpidity.filter.MessageFilter;
import org.apache.qpidity.jms.message.MessageFactory;
import org.apache.qpidity.jms.message.QpidMessage;
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.transport.RangeSet;
/**
* Implementation of JMS message consumer
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
index 9ed74e1cd0..ec837bc4e6 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
@@ -18,7 +18,7 @@
package org.apache.qpidity.jms;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Option;
+import org.apache.qpidity.transport.Option;
import org.apache.qpidity.url.BindingURL;
import org.apache.qpidity.exchange.ExchangeDefaults;
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
index 675a51625a..40bf7f15a8 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
@@ -21,7 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpidity.jms.message.*;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.transport.RangeSet;
import javax.jms.*;
import javax.jms.IllegalStateException;
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
index 22feb29598..d23b08e37a 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
@@ -18,8 +18,8 @@
package org.apache.qpidity.jms;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Option;
import org.apache.qpidity.exchange.ExchangeDefaults;
+import org.apache.qpidity.transport.Option;
import org.apache.qpidity.url.BindingURL;
import javax.jms.Topic;
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
index b84b55966e..dab7585deb 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
@@ -21,8 +21,10 @@ import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
-import org.apache.qpidity.*;
+import org.apache.qpidity.QpidException;
import org.apache.qpidity.dtx.XidImpl;
+import org.apache.qpidity.transport.*;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
index c11a7d8c3b..a7e4d02651 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
@@ -29,8 +29,8 @@ import java.io.IOException;
import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.ReplyTo;
import org.apache.qpidity.client.util.ByteBufferMessage;
+import org.apache.qpidity.transport.ReplyTo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/java/common/generate b/java/common/generate
index ac49c6283d..b2448c6e5d 100755
--- a/java/common/generate
+++ b/java/common/generate
@@ -8,6 +8,7 @@ from cStringIO import StringIO
out_dir=sys.argv[1]
out_pkg = sys.argv[2]
spec_file = sys.argv[3]
+
spec = mllib.xml_parse(spec_file)
class Output:
@@ -27,6 +28,8 @@ class Output:
self.line("import org.apache.qpidity.codec.Encodable;")
self.line("import org.apache.qpidity.codec.Encoder;")
self.line()
+ self.line("import org.apache.qpidity.transport.network.Frame;")
+ self.line()
self.line()
def line(self, l = ""):
@@ -363,7 +366,7 @@ fct.line("}");
fct.write()
dlg = Output(out_dir, out_pkg, "Delegate")
-dlg.line("public abstract class Delegate<C> {")
+dlg.line("public abstract class Delegate<C> extends AbstractDelegate<C> {")
for s in structs:
dlg.line(" public void %s(C context, %s struct) {}" %
(dromedary(s.name), s.name))
diff --git a/java/common/pom.xml b/java/common/pom.xml
index d480aecb1b..25f8550304 100644
--- a/java/common/pom.xml
+++ b/java/common/pom.xml
@@ -6,9 +6,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
-
+
http://www.apache.org/licenses/LICENSE-2.0
-
+
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -87,9 +87,11 @@
<param>-Dpython.path=${basedir}/jython-lib.jar/Lib${path.separator}${mllib.dir}</param>
<param>${basedir}/generate</param>
<param>${generated.path}</param>
- <param>org.apache.qpidity</param>
+ <param>org.apache.qpidity.transport</param>
<param>${specs.dir}/amqp.0-10-preview.xml</param>
</params>
+ <source>${specs.dir}/amqp.0-10-preview.xml</source>
+ <timestamp>${generated.path}/generated.timestamp</timestamp>
</configuration>
<goals>
<goal>jython</goal>
@@ -97,8 +99,8 @@
</execution>
</executions>
</plugin>
-
- <!-- Generates message selector grammar -->
+
+ <!-- Generates message selector grammar -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
@@ -145,13 +147,13 @@
</executions>
</plugin>
-
+
</plugins>
</build>
<dependencies>
- <dependency>
+ <dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</dependency>
@@ -166,14 +168,14 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <version>1.4.0</version>
+ <version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.4.0</version>
- <scope>test</scope>
+ <scope>test</scope>
</dependency>
<dependency>
@@ -204,7 +206,7 @@
<scope>provided</scope>
</dependency>
<!--- This is used by filter -->
- <dependency>
+ <dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</dependency>
diff --git a/java/common/src/main/java/org/apache/qpidity/Channel.java b/java/common/src/main/java/org/apache/qpidity/Channel.java
deleted file mode 100644
index 3c734ba8f4..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/Channel.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity;
-
-import java.nio.ByteBuffer;
-
-import java.util.List;
-import java.util.ArrayList;
-
-import org.apache.qpidity.codec.SegmentEncoder;
-import org.apache.qpidity.codec.SizeEncoder;
-
-import static org.apache.qpidity.Frame.*;
-import static org.apache.qpidity.Functions.*;
-
-
-/**
- * Channel
- *
- * @author Rafael H. Schloming
- */
-
-public class Channel extends Invoker implements Handler<Frame>
-{
-
- final private Connection connection;
- final private int channel;
- final private TrackSwitch<Channel> tracks;
- final private Delegate<Channel> delegate;
- final private SessionDelegate sessionDelegate;
- // session may be null
- private Session session;
-
- private Method method = null;
- private List<ByteBuffer> data = null;
- private int dataSize;
-
- public Channel(Connection connection, int channel, SessionDelegate delegate)
- {
- this.connection = connection;
- this.channel = channel;
- this.delegate = new ChannelDelegate();
- this.sessionDelegate = delegate;
-
- tracks = new TrackSwitch<Channel>();
- tracks.map(L1, new MethodHandler<Channel>
- (getMajor(), getMinor(), connection.getConnectionDelegate()));
- tracks.map(L2, new MethodHandler<Channel>
- (getMajor(), getMinor(), this.delegate));
- tracks.map(L3, new SessionResolver<Frame>
- (new MethodHandler<Session>
- (getMajor(), getMinor(), delegate)));
- tracks.map(L4, new SessionResolver<Frame>
- (new ContentHandler(getMajor(), getMinor(), delegate)));
- }
-
- public byte getMajor()
- {
- return connection.getMajor();
- }
-
- public byte getMinor()
- {
- return connection.getMinor();
- }
-
- public int getEncodedChannel() {
- return channel;
- }
-
- public Session getSession()
- {
- return session;
- }
-
- void setSession(Session session)
- {
- this.session = session;
- }
-
- public void handle(Frame frame)
- {
- tracks.handle(new Event<Channel,Frame>(this, frame));
- }
-
- private SegmentEncoder newEncoder(byte flags, byte track, byte type, int size)
- {
- return new SegmentEncoder(getMajor(),
- getMinor(),
- connection.getOutputHandler(),
- connection.getMaxFrame(),
- (byte) (flags | VERSION),
- track,
- type,
- channel,
- size);
- }
-
- public void method(Method m)
- {
- SizeEncoder sizer = new SizeEncoder(getMajor(), getMinor());
- sizer.writeLong(m.getEncodedType());
- m.write(sizer, getMajor(), getMinor());
- sizer.flush();
- int size = sizer.getSize();
-
- byte flags = FIRST_SEG;
-
- if (!m.hasPayload())
- {
- flags |= LAST_SEG;
- }
-
- SegmentEncoder enc = newEncoder(flags, m.getEncodedTrack(),
- m.getSegmentType(), size);
- enc.writeLong(m.getEncodedType());
- m.write(enc, getMajor(), getMinor());
- enc.flush();
-
- if (m.hasPayload())
- {
- method = m;
- }
-
- if (m.getEncodedTrack() != Frame.L4)
- {
- System.out.println("sent control " + m.getClass().getName());
- }
- }
-
- public void headers(Struct ... headers)
- {
- if (method == null)
- {
- throw new IllegalStateException("cannot write headers without method");
- }
-
- SizeEncoder sizer = new SizeEncoder(getMajor(), getMinor());
- for (Struct hdr : headers)
- {
- sizer.writeLongStruct(hdr);
- }
-
- SegmentEncoder enc = newEncoder((byte) 0x0,
- method.getEncodedTrack(),
- HEADER,
- sizer.getSize());
- for (Struct hdr : headers)
- {
- enc.writeLongStruct(hdr);
- enc.flush();
- System.out.println("sent " + hdr);
- }
- }
-
- public void data(ByteBuffer buf)
- {
- if (data == null)
- {
- data = new ArrayList<ByteBuffer>();
- dataSize = 0;
- }
- data.add(buf);
- dataSize += buf.remaining();
- }
-
- public void data(String str)
- {
- data(str.getBytes());
- }
-
- public void data(byte[] bytes)
- {
- data(ByteBuffer.wrap(bytes));
- }
-
- public void end()
- {
- byte flags = LAST_SEG;
- SegmentEncoder enc = newEncoder(flags, method.getEncodedTrack(),
- BODY, dataSize);
- for (ByteBuffer buf : data)
- {
- enc.put(buf);
- System.out.println("sent " + str(buf));
- }
- enc.flush();
- data = null;
- dataSize = 0;
- }
-
- protected void invoke(Method m)
- {
- method(m);
- }
-
- protected <T> Future<T> invoke(Method m, Class<T> cls)
- {
- throw new UnsupportedOperationException();
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java b/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
deleted file mode 100644
index c6190dc3d7..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity;
-
-
-/**
- * CommandDispatcher
- *
- * @author Rafael H. Schloming
- */
-
-class CommandDispatcher implements Handler<Event<Session,Method>>
-{
-
- private final Delegate<Session> delegate;
-
- public CommandDispatcher(Delegate<Session> delegate)
- {
- this.delegate = delegate;
- }
-
- public void handle(Event<Session,Method> event)
- {
- Session ssn = event.context;
- Method method = event.target;
- method.setId(ssn.nextCommandId());
- System.out.println("\n Delegating " + method.getClass().getName() + "[" + method.getId() + "] to " + delegate.getClass().getName() + "\n");
- method.delegate(ssn, delegate);
- if (!method.hasPayload())
- {
- ssn.processed(method);
- }
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/Connection.java b/java/common/src/main/java/org/apache/qpidity/Connection.java
deleted file mode 100644
index b70c8fae18..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/Connection.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import java.nio.ByteBuffer;
-
-
-/**
- * Connection
- *
- * @author Rafael H. Schloming
- *
- * @todo the channels map should probably be replaced with something
- * more efficient, e.g. an array or a map implementation that can use
- * short instead of Short
- */
-
-// RA making this public until we sort out the package issues
-public class Connection implements ProtocolActions
-{
-
- final private Handler<ByteBuffer> input;
- final private Handler<ByteBuffer> output;
- final private ConnectionDelegate delegate;
-
- final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
- // XXX: hardcoded versions
- private ProtocolHeader header = new ProtocolHeader((byte) 1, (byte) 0, (byte) 10);
- // XXX
- private int maxFrame = 64*1024;
-
- public Connection(Handler<ByteBuffer> output,
- ConnectionDelegate delegate,
- InputHandler.State state)
- {
- this.input = new InputHandler(this, state);
- this.output = output;
- this.delegate = delegate;
- }
-
- public ConnectionDelegate getConnectionDelegate()
- {
- return delegate;
- }
-
- public Connection(Handler<ByteBuffer> output,
- ConnectionDelegate delegate)
- {
- this(output, delegate, InputHandler.State.PROTO_HDR);
- }
-
- public Handler<ByteBuffer> getInputHandler()
- {
- return input;
- }
-
- public Handler<ByteBuffer> getOutputHandler()
- {
- return output;
- }
-
- public ProtocolHeader getHeader()
- {
- return header;
- }
-
- public byte getMajor()
- {
- return header.getMajor();
- }
-
- public byte getMinor()
- {
- return header.getMinor();
- }
-
- public int getMaxFrame()
- {
- return maxFrame;
- }
-
- public void init(ProtocolHeader hdr)
- {
- System.out.println(header);
- if (hdr.getMajor() != header.getMajor() &&
- hdr.getMinor() != header.getMinor())
- {
- output.handle(header.toByteBuffer());
- // XXX: how do we close the connection?
- }
-
- // not sure if this is the right place
- System.out.println("\n--------------------Broker Start Connection Negotiation -----------------------\n");
-
- getChannel(0).connectionStart(header.getMajor(), header.getMinor(), null, "PLAIN", "utf8");
- }
-
- public Channel getChannel(int number)
- {
- Channel channel = channels.get(number);
- if (channel == null)
- {
- channel = new Channel(this, number, delegate.getSessionDelegate());
- channels.put(number, channel);
- }
- return channel;
- }
-
- public void frame(Frame frame)
- {
- Channel channel = getChannel(frame.getChannel());
- channel.handle(frame);
- }
-
- public void error(ProtocolError error)
- {
- throw new RuntimeException(error.getMessage());
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java b/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java
index fa35e9332f..4e05aa574c 100644
--- a/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java
+++ b/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java
@@ -22,7 +22,9 @@ package org.apache.qpidity;
import java.nio.ByteBuffer;
-import static org.apache.qpidity.Functions.*;
+import org.apache.qpidity.transport.Sender;
+
+import static org.apache.qpidity.transport.util.Functions.*;
/**
@@ -31,12 +33,17 @@ import static org.apache.qpidity.Functions.*;
* @author Rafael H. Schloming
*/
-class ConsoleOutput implements Handler<ByteBuffer>
+public class ConsoleOutput implements Sender<ByteBuffer>
{
- public void handle(ByteBuffer buf)
+ public void send(ByteBuffer buf)
{
System.out.println(str(buf));
}
+ public void close()
+ {
+ System.out.println("CLOSED");
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/ContentHandler.java b/java/common/src/main/java/org/apache/qpidity/ContentHandler.java
deleted file mode 100644
index b54e28c85e..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/ContentHandler.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity;
-
-
-/**
- * ContentHandler is a stateful handler that aggregates and dispatches
- * method and header frames, and passes body frames through to another
- * handler.
- *
- * @author Rafael H. Schloming
- */
-
-class ContentHandler extends TypeSwitch<Session>
-{
-
- public ContentHandler(byte major, byte minor, SessionDelegate delegate)
- {
- CommandDispatcher disp = new CommandDispatcher(delegate);
- MethodDecoder<Session> dec = new MethodDecoder<Session>(major, minor, disp);
- HeaderHandler hh = new HeaderHandler(major, minor, delegate);
- map(Frame.METHOD, new SegmentAssembler<Session>(dec));
- map(Frame.HEADER, new SegmentAssembler<Session>(hh));
- map(Frame.BODY, new BodyHandler(delegate));
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/Event.java b/java/common/src/main/java/org/apache/qpidity/Event.java
deleted file mode 100644
index ae210f4c20..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/Event.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity;
-
-
-/**
- * Events are a common class of thing to handle. An event has a
- * context and a target. This division permits the same target
- * instance to be used in a variety of contexts.
- *
- * @author Rafael H. Schloming
- */
-
-public class Event<C, T>
-{
-
- C context;
- T target;
-
- public Event(C context, T target)
- {
- this.context = context;
- this.target = target;
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/Handler.java b/java/common/src/main/java/org/apache/qpidity/Handler.java
deleted file mode 100644
index 0ff949cc1d..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/Handler.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity;
-
-/**
- * Handler is a basic interface used throughout this library for
- * callbacks, listeners, event handlers, etc.
- *
- * @author Rafael H. Schloming
- */
-
-public interface Handler<E>
-{
-
- void handle(E event);
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java b/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java
deleted file mode 100644
index 8737932712..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity;
-
-import java.nio.ByteBuffer;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-
-import org.apache.qpidity.codec.FragmentDecoder;
-
-
-/**
- * HeaderHandler
- *
- * @author Rafael H. Schloming
- */
-
-class HeaderHandler implements Handler<Event<Session,Segment>>
-{
-
- private static final Struct[] EMPTY_STRUCT_ARRAY = {};
-
- private final byte major;
- private final byte minor;
- private final SessionDelegate delegate;
-
- public HeaderHandler(byte major, byte minor, SessionDelegate delegate)
- {
- this.major = major;
- this.minor = minor;
- this.delegate = delegate;
- }
-
- public void handle(Event<Session,Segment> event)
- {
- System.out.println("got header segment:\n " + event.target);
- Iterator<ByteBuffer> fragments = event.target.getFragments();
- FragmentDecoder dec = new FragmentDecoder(major, minor, fragments);
- ArrayList<Struct> headers = new ArrayList();
- while (dec.hasRemaining())
- {
- headers.add(dec.readLongStruct());
- }
- delegate.headers(event.context, headers.toArray(EMPTY_STRUCT_ARRAY));
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java b/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
deleted file mode 100644
index b01f067381..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity;
-
-import java.nio.ByteBuffer;
-
-import java.util.Iterator;
-
-import org.apache.qpidity.codec.Decoder;
-import org.apache.qpidity.codec.FragmentDecoder;
-
-
-/**
- * MethodDecoder
- *
- * @author Rafael H. Schloming
- */
-
-class MethodDecoder<C> implements Handler<Event<C,Segment>>
-{
-
- private final byte major;
- private final byte minor;
- private final Handler<Event<C,Method>> handler;
-
- public MethodDecoder(byte major, byte minor, Handler<Event<C,Method>> handler)
- {
- this.major = major;
- this.minor = minor;
- this.handler = handler;
- }
-
- public void handle(Event<C,Segment> event)
- {
- //System.out.println("got method segment:\n " + event.target);
- Iterator<ByteBuffer> fragments = event.target.getFragments();
- Decoder dec = new FragmentDecoder(major, minor, fragments);
- int type = (int) dec.readLong();
- Method method = Method.create(type);
- method.read(dec, major, minor);
- handler.handle(new Event<C,Method>(event.context, method));
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/MethodHandler.java b/java/common/src/main/java/org/apache/qpidity/MethodHandler.java
deleted file mode 100644
index 86dac241a2..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/MethodHandler.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity;
-
-
-/**
- * MethodHandler is a stateful handler that aggregates frames into
- * method segments and dispatches the resulting method. It does not
- * accept any segment type other than Frame.METHOD.
- *
- * @author Rafael H. Schloming
- */
-
-class MethodHandler<C> extends TypeSwitch<C>
-{
-
- public MethodHandler(byte major, byte minor, Delegate<C> delegate)
- {
- MethodDispatcher disp = new MethodDispatcher<C>(delegate);
- MethodDecoder<C> dec = new MethodDecoder<C>(major, minor, disp);
- map(Frame.METHOD, new SegmentAssembler<C>(dec));
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java b/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java
deleted file mode 100644
index 0fb90a2f53..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity;
-
-import java.nio.ByteBuffer;
-
-
-/**
- * SegmentAssembler is a stateful handler that aggregates Frame events
- * into Segment events. This should only be used where it is necessary
- * to assemble a Segment before processing, e.g. for Method and Header
- * segments.
- *
- * @author Rafael H. Schloming
- */
-
-class SegmentAssembler<C> implements Handler<Event<C,Frame>>
-{
-
- final private Handler<Event<C,Segment>> handler;
- private Segment segment;
-
- public SegmentAssembler(Handler<Event<C,Segment>> handler)
- {
- this.handler = handler;
- }
-
- public void handle(Event<C, Frame> event)
- {
- Frame frame = event.target;
- if (frame.isFirstFrame())
- {
- segment = new Segment();
- }
-
- for (ByteBuffer fragment : frame)
- {
- segment.add(fragment);
- }
-
- if (frame.isLastFrame())
- {
- handler.handle(new Event<C, Segment>(event.context, segment));
- }
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/SessionResolver.java b/java/common/src/main/java/org/apache/qpidity/SessionResolver.java
deleted file mode 100644
index 24e52839fd..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/SessionResolver.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity;
-
-
-/**
- * SessionResolver is a stateless handler that accepts incoming events
- * whose context is a Channel, and produces an event whose context is
- * a Session.
- *
- * @author Rafael H. Schloming
- */
-
-class SessionResolver<T> implements Handler<Event<Channel,T>>
-{
-
- final private Handler<Event<Session,T>> handler;
-
- public SessionResolver(Handler<Event<Session,T>> handler)
- {
- this.handler = handler;
- }
-
- public void handle(Event<Channel,T> event)
- {
- handler.handle(new Event<Session,T>(event.context.getSession(), event.target));
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/Stub.java b/java/common/src/main/java/org/apache/qpidity/Stub.java
deleted file mode 100644
index 7aa5030672..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/Stub.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package org.apache.qpidity;
-
-import java.util.*;
-import java.lang.annotation.*;
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpidity.codec.BBEncoder;
-import org.apache.qpidity.codec.Encoder;
-import org.apache.qpidity.codec.SizeEncoder;
-
-import static org.apache.qpidity.Option.*;
-
-
-public class Stub {
-
- private static final byte major = 0;
- private static final byte minor = 10;
-
- private static Connection conn = new Connection(new ConsoleOutput(),
- SessionDelegateStub.source());
-
- static
- {
- conn.init(new ProtocolHeader((byte) 1, major, minor));
- }
-
- private static void frame(byte track, byte type, boolean first, boolean last) {
- frame(track, type, first, last, null);
- }
-
- private static void frame(byte track, byte type, boolean first, boolean last, Method m) {
- SizeEncoder sizer = new SizeEncoder(major, minor);
- if (m != null) {
- sizer.writeLong(m.getEncodedType());
- m.write(sizer, major, minor);
- sizer.flush();
- }
- ByteBuffer buf = ByteBuffer.allocate(sizer.getSize());
- if (m != null) {
- Encoder enc = new BBEncoder(major, minor, buf);
- enc.writeLong(m.getEncodedType());
- m.write(enc, major, minor);
- enc.flush();
- }
- buf.flip();
- byte flags = Frame.VERSION;
- if (first) { flags |= Frame.FIRST_FRAME; }
- if (last) { flags |= Frame.LAST_FRAME; }
- Frame frame = new Frame(flags, type, track, 0);
- frame.addFragment(buf);
- conn.frame(frame);
- }
-
- public static final void main(String[] args) {
- frame(Frame.L2, Frame.METHOD, true, true, new SessionOpen(0));
- frame(Frame.L4, Frame.METHOD, true, false,
- new QueueDeclare("asdf", "alternate", null, DURABLE));
- frame(Frame.L4, Frame.METHOD, false, false);
- frame(Frame.L3, Frame.METHOD, true, true,
- new ExchangeDeclare("exchange", "type", "alternate", null));
- frame(Frame.L4, Frame.METHOD, false, true);
- frame(Frame.L4, Frame.HEADER, true, false);
- frame(Frame.L4, Frame.HEADER, false, false);
- frame(Frame.L4, Frame.HEADER, false, true);
- frame(Frame.L4, Frame.BODY, true, false);
- frame(Frame.L4, Frame.BODY, false, false);
- frame(Frame.L4, Frame.BODY, false, false);
- frame(Frame.L1, Frame.METHOD, true, true,
- new ExchangeDeclare("exchange", "type", "alternate", null));
- frame(Frame.L4, Frame.BODY, false, false);
- frame(Frame.L4, Frame.BODY, false, true);
- }
-
-}
-
-class SessionDelegateStub extends SessionDelegate {
-
- public static final ConnectionDelegate source()
- {
- return new ConnectionDelegate()
- {
- public SessionDelegate getSessionDelegate()
- {
- return new SessionDelegateStub();
- }
- };
- }
-
- public @Override void queueDeclare(Session session, QueueDeclare qd) {
- System.out.println("got a queue declare: " + qd.getQueue());
- }
-
- public @Override void exchangeDeclare(Session session, ExchangeDeclare ed) {
- System.out.println("got an exchange declare: " + ed.getExchange() + ", " + ed.getType());
- session.queueDeclare("asdf", "alternate", null);
- }
-
- public void data(Session ssn, Frame frame)
- {
- System.out.println("got data: " + frame);
- }
-
- public void headers(Session ssn, Struct ... headers)
- {
- System.out.println("got headers: " + headers);
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
index 0a48a0a990..f69b72227c 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
@@ -20,7 +20,10 @@
*/
package org.apache.qpidity;
-import static org.apache.qpidity.Functions.str;
+import org.apache.qpidity.transport.*;
+import org.apache.qpidity.transport.network.mina.MinaHandler;
+
+import static org.apache.qpidity.transport.util.Functions.str;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -44,10 +47,10 @@ class ToyBroker extends SessionDelegate
private ToyExchange exchange;
private MessageTransfer xfr = null;
private DeliveryProperties props = null;
- private Struct[] headers = null;
- private List<Frame> frames = null;
+ private Header header = null;
+ private List<Data> body = null;
private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>();
-
+
public ToyBroker(ToyExchange exchange)
{
this.exchange = exchange;
@@ -58,7 +61,7 @@ class ToyBroker extends SessionDelegate
exchange.createQueue(qd.getQueue());
System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n");
}
-
+
@Override public void queueBind(Session ssn, QueueBind qb)
{
exchange.bindQueue(qb.getExchange(), qb.getRoutingKey(),qb.getQueue());
@@ -70,22 +73,22 @@ class ToyBroker extends SessionDelegate
QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue());
ssn.executionResult(qq.getId(), result);
}
-
+
@Override public void messageSubscribe(Session ssn, MessageSubscribe ms)
{
Consumer c = new Consumer();
c._queueName = ms.getQueue();
consumers.put(ms.getDestination(),c);
- System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n");
- }
-
+ System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n");
+ }
+
@Override public void messageFlow(Session ssn,MessageFlow struct)
{
Consumer c = consumers.get(struct.getDestination());
c._credit = struct.getValue();
System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n");
}
-
+
@Override public void messageFlush(Session ssn,MessageFlush struct)
{
System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n");
@@ -95,47 +98,44 @@ class ToyBroker extends SessionDelegate
@Override public void messageTransfer(Session ssn, MessageTransfer xfr)
{
this.xfr = xfr;
- frames = new ArrayList<Frame>();
+ body = new ArrayList<Data>();
System.out.println("received transfer " + xfr.getDestination());
}
- public void headers(Session ssn, Struct ... headers)
+ @Override public void header(Session ssn, Header header)
{
- if (xfr == null || frames == null)
+ if (xfr == null || body == null)
{
ssn.connectionClose(503, "no method segment", 0, 0);
- // XXX: close at our end
+ ssn.close();
return;
}
- for (Struct hdr : headers)
+ props = header.get(DeliveryProperties.class);
+ if (props != null)
{
- if (hdr instanceof DeliveryProperties)
- {
- props = (DeliveryProperties) hdr;
- System.out.println("received headers routing_key " + props.getRoutingKey());
- }
+ System.out.println("received headers routing_key " + props.getRoutingKey());
}
-
- this.headers = headers;
+
+ this.header = header;
}
- public void data(Session ssn, Frame frame)
+ @Override public void data(Session ssn, Data data)
{
- if (xfr == null || frames == null)
+ if (xfr == null || body == null)
{
ssn.connectionClose(503, "no method segment", 0, 0);
- // XXX: close at our end
+ ssn.close();
return;
}
- frames.add(frame);
+ body.add(data);
- if (frame.isLastSegment() && frame.isLastFrame())
+ if (data.isLast())
{
String dest = xfr.getDestination();
- Message m = new Message(headers, frames);
-
+ Message m = new Message(header, body);
+
if (exchange.route(dest,props.getRoutingKey(),m))
{
System.out.println("queued " + m);
@@ -143,12 +143,12 @@ class ToyBroker extends SessionDelegate
}
else
{
-
+
reject(ssn);
}
ssn.processed(xfr);
xfr = null;
- frames = null;
+ body = null;
}
}
@@ -165,22 +165,22 @@ class ToyBroker extends SessionDelegate
ssn.messageReject(ranges, 0, "no such destination");
}
}
-
+
private void transferMessageToPeer(Session ssn,String dest, Message m)
{
System.out.println("\n==================> Transfering message to: " +dest + "\n");
ssn.messageTransfer(dest, (short)0, (short)0);
- ssn.headers(m.headers);
- for (Frame f : m.frames)
+ ssn.header(m.header);
+ for (Data d : m.body)
{
- for (ByteBuffer b : f)
+ for (ByteBuffer b : d.getFragments())
{
ssn.data(b);
}
}
ssn.endData();
}
-
+
private void dispatchMessages(Session ssn)
{
for (String dest: consumers.keySet())
@@ -188,8 +188,8 @@ class ToyBroker extends SessionDelegate
checkAndSendMessagesToConsumer(ssn,dest);
}
}
-
- private void checkAndSendMessagesToConsumer(Session ssn,String dest)
+
+ private void checkAndSendMessagesToConsumer(Session ssn,String dest)
{
Consumer c = consumers.get(dest);
LinkedBlockingQueue<Message> queue = exchange.getQueue(c._queueName);
@@ -204,33 +204,33 @@ class ToyBroker extends SessionDelegate
class Message
{
- private final Struct[] headers;
- private final List<Frame> frames;
+ private final Header header;
+ private final List<Data> body;
- public Message(Struct[] headers, List<Frame> frames)
+ public Message(Header header, List<Data> body)
{
- this.headers = headers;
- this.frames = frames;
+ this.header = header;
+ this.body = body;
}
public String toString()
{
StringBuilder sb = new StringBuilder();
- if (headers != null)
+ if (header != null)
{
boolean first = true;
- for (Struct hdr : headers)
+ for (Struct st : header.getStructs())
{
if (first) { first = false; }
else { sb.append(" "); }
- sb.append(hdr);
+ sb.append(st);
}
}
- for (Frame f : frames)
+ for (Data d : body)
{
- for (ByteBuffer b : f)
+ for (ByteBuffer b : d.getFragments())
{
sb.append(" | ");
sb.append(str(b));
@@ -241,7 +241,7 @@ class ToyBroker extends SessionDelegate
}
}
-
+
// ugly, but who cares :)
// assumes unit is always no of messages, not bytes
// assumes it's credit mode and not window
@@ -253,7 +253,7 @@ class ToyBroker extends SessionDelegate
public static final void main(String[] args) throws IOException
{
- final ToyExchange exchange = new ToyExchange();
+ final ToyExchange exchange = new ToyExchange();
ConnectionDelegate delegate = new ConnectionDelegate()
{
public SessionDelegate getSessionDelegate()
@@ -261,11 +261,11 @@ class ToyBroker extends SessionDelegate
return new ToyBroker(exchange);
}
};
-
+
//hack
delegate.setUsername("guest");
delegate.setPassword("guest");
-
+
MinaHandler.accept("0.0.0.0", 5672, delegate);
}
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java
index e325fb93be..d924c55c7b 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpidity;
+import org.apache.qpidity.transport.*;
+import org.apache.qpidity.transport.network.mina.MinaHandler;
+
/**
* ToyClient
@@ -42,17 +45,17 @@ class ToyClient extends SessionDelegate
}
}
- public void headers(Session ssn, Struct ... headers)
+ @Override public void header(Session ssn, Header header)
{
- for (Struct hdr : headers)
+ for (Struct st : header.getStructs())
{
- System.out.println("header: " + hdr);
+ System.out.println("header: " + st);
}
}
- public void data(Session ssn, Frame frame)
+ @Override public void data(Session ssn, Data data)
{
- System.out.println("got data: " + frame);
+ System.out.println("got data: " + data);
}
public static final void main(String[] args)
@@ -65,7 +68,7 @@ class ToyClient extends SessionDelegate
return new ToyClient();
}
});
- conn.getOutputHandler().handle(conn.getHeader().toByteBuffer());
+ conn.send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10)));
Channel ch = conn.getChannel(0);
Session ssn = new Session();
@@ -76,8 +79,8 @@ class ToyClient extends SessionDelegate
ssn.sync();
ssn.messageTransfer("asdf", (short) 0, (short) 1);
- ssn.headers(new DeliveryProperties(),
- new MessageProperties());
+ ssn.header(new DeliveryProperties(),
+ new MessageProperties());
ssn.data("this is the data");
ssn.endData();
@@ -88,6 +91,8 @@ class ToyClient extends SessionDelegate
Future<QueueQueryResult> future = ssn.queueQuery("asdf");
System.out.println(future.get().getQueue());
+ ssn.close();
+ conn.close();
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java b/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java
deleted file mode 100644
index 28a7d75f05..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity;
-
-
-/**
- * A TrackSwitch sends incoming frames to a different handler based on
- * track.
- *
- * @author Rafael H. Schloming
- */
-
-class TrackSwitch<C> extends Switch<Byte,Event<C,Frame>>
-{
-
- public Byte resolve(Event<C,Frame> event)
- {
- return event.target.getTrack();
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java b/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java
deleted file mode 100644
index fc53b0b9b4..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity;
-
-
-/**
- * A TypeSwitch sends incoming frames to a different handler based on
- * type.
- *
- * @author Rafael H. Schloming
- */
-
-class TypeSwitch<C> extends Switch<Byte,Event<C,Frame>>
-{
-
- public Byte resolve(Event<C,Frame> event)
- {
- return event.target.getType();
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/api/Message.java b/java/common/src/main/java/org/apache/qpidity/api/Message.java
index ab45ee01aa..afb17547ac 100644
--- a/java/common/src/main/java/org/apache/qpidity/api/Message.java
+++ b/java/common/src/main/java/org/apache/qpidity/api/Message.java
@@ -3,8 +3,8 @@ package org.apache.qpidity.api;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.qpidity.MessageProperties;
-import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.transport.DeliveryProperties;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -14,9 +14,9 @@ import org.apache.qpidity.DeliveryProperties;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,13 +30,13 @@ public interface Message
public MessageProperties getMessageProperties();
public DeliveryProperties getDeliveryProperties();
-
+
/**
* This will abstract the underlying message data.
* The Message implementation may not hold all message
* data in memory (especially in the case of large messages)
- *
- * The appendData function might write data to
+ *
+ * The appendData function might write data to
* <ul>
* <li> Memory (Ex: ByteBuffer)
* <li> To Disk
@@ -50,54 +50,54 @@ public interface Message
* This will abstract the underlying message data.
* The Message implementation may not hold all message
* data in memory (especially in the case of large messages)
- *
- * The appendData function might write data to
+ *
+ * The appendData function might write data to
* <ul>
* <li> Memory (Ex: ByteBuffer)
* <li> To Disk
* <li> To Socket (Stream)
* </ul>
* @param src - the data to append
- */
+ */
public void appendData(ByteBuffer src) throws IOException;
-
+
/**
* This will abstract the underlying message data.
* The Message implementation may not hold all message
* data in memory (especially in the case of large messages)
- *
+ *
* The read function might copy data from
* <ul>
* <li> From memory (Ex: ByteBuffer)
* <li> From Disk
* <li> From Socket as and when it gets streamed
* </ul>
- * @param target The target byte[] which the data gets copied to
+ * @param target The target byte[] which the data gets copied to
*/
- public void readData(byte[] target) throws IOException;
-
+ public void readData(byte[] target) throws IOException;
+
/**
* * This will abstract the underlying message data.
* The Message implementation may not hold all message
* data in memory (especially in the case of large messages)
- *
+ *
* The read function might copy data from
* <ul>
* <li> From memory (Ex: ByteBuffer)
* <li> From Disk
* <li> From Socket as and when it gets streamed
* </ul>
- *
+ *
* @return A ByteBuffer containing data
* @throws IOException
*/
- public ByteBuffer readData() throws IOException;
-
+ public ByteBuffer readData() throws IOException;
+
/**
* This should clear the body of the message.
*/
public void clearData();
-
+
/**
* The provides access to the command Id assigned to the
* message transfer.
@@ -107,15 +107,14 @@ public interface Message
* you could use this id to accquire it.
* <li>For releasing a message. You can use this id to release an acquired
* message
- * <li>For Acknowledging a message - You need to pass this ID, in order to
+ * <li>For Acknowledging a message - You need to pass this ID, in order to
* acknowledge the message
* <li>For Rejecting a message - You need to pass this ID, in order to reject
- * the message.
+ * the message.
* </ul>
- *
+ *
* @return the message transfer id.
*/
public long getMessageTransferId();
-
-}
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java
index 134c9cdf47..cfb9dfbe92 100644
--- a/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java
@@ -23,10 +23,10 @@ package org.apache.qpidity.codec;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.transport.Struct;
-import static org.apache.qpidity.Functions.*;
+import static org.apache.qpidity.transport.util.Functions.*;
/**
diff --git a/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java
index b6f875bc5d..0cc5a4157a 100644
--- a/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java
@@ -25,11 +25,11 @@ import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpidity.Range;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.Range;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.transport.Struct;
-import static org.apache.qpidity.Functions.*;
+import static org.apache.qpidity.transport.util.Functions.*;
/**
diff --git a/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java b/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java
index ec91c877f8..0d224056de 100644
--- a/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java
@@ -23,8 +23,8 @@ package org.apache.qpidity.codec;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.transport.Struct;
/**
diff --git a/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java b/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java
index b879ed5cd7..9a79ec70fa 100644
--- a/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java
@@ -23,8 +23,8 @@ package org.apache.qpidity.codec;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.transport.Struct;
/**
diff --git a/java/common/src/main/java/org/apache/qpidity/codec/SegmentEncoder.java b/java/common/src/main/java/org/apache/qpidity/codec/SegmentEncoder.java
deleted file mode 100644
index 8fb396f278..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/codec/SegmentEncoder.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity.codec;
-
-import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import org.apache.qpidity.Frame;
-import org.apache.qpidity.Handler;
-
-import static java.lang.Math.*;
-
-import static org.apache.qpidity.Frame.*;
-
-
-/**
- * SegmentEncoder
- *
- * @author Rafael H. Schloming
- */
-
-public class SegmentEncoder extends AbstractEncoder
-{
-
- private final Handler<ByteBuffer> handler;
- private final int max;
- private final byte flags;
- private final byte track;
- private final byte type;
- private final int channel;
-
- private int remaining;
- private ByteBuffer frame;
- private boolean first;
-
- public SegmentEncoder(byte major, byte minor, Handler<ByteBuffer> handler,
- int max, byte flags, byte track, byte type,
- int channel, int remaining)
- {
- super(major, minor);
- if (max < HEADER_SIZE + 1)
- {
- throw new IllegalArgumentException
- ("max frame size must be large enough to include header");
- }
-
- this.handler = handler;
- this.max = max;
- this.flags = flags;
- this.track = track;
- this.type = type;
- this.channel = channel;
- this.remaining = remaining;
- this.frame = null;
- this.first = true;
- }
-
- private void preWrite() {
- if (remaining == 0)
- {
- throw new BufferOverflowException();
- }
-
- if (frame == null)
- {
- frame = ByteBuffer.allocate(min(max, remaining + HEADER_SIZE));
- frame.order(ByteOrder.BIG_ENDIAN);
-
- byte frameFlags = flags;
- if (first) { frameFlags |= FIRST_FRAME; first = false; }
- if (remaining <= (frame.remaining() - HEADER_SIZE))
- {
- frameFlags |= LAST_FRAME;
- }
-
- frame.put(frameFlags);
- frame.put(type);
- frame.putShort((short) frame.limit());
- frame.put(RESERVED);
- frame.put(track);
- frame.putShort((short) channel);
- frame.put(RESERVED);
- frame.put(RESERVED);
- frame.put(RESERVED);
- frame.put(RESERVED);
-
- assert frame.position() == HEADER_SIZE;
- }
- }
-
- private void postWrite() {
- if (!frame.hasRemaining())
- {
- frame.flip();
- handler.handle(frame);
- frame = null;
- }
- }
-
- @Override public void put(byte b)
- {
- preWrite();
- frame.put(b);
- remaining -= 1;
- postWrite();
- }
-
- @Override public void put(ByteBuffer src)
- {
- if (src.remaining() > remaining)
- {
- throw new BufferOverflowException();
- }
-
- while (src.hasRemaining())
- {
- preWrite();
- int limit = src.limit();
- src.limit(src.position() + min(frame.remaining(), src.remaining()));
- remaining -= src.remaining();
- frame.put(src);
- src.limit(limit);
- postWrite();
- }
- }
-
- public static final void main(String[] args) {
- ByteBuffer buf = ByteBuffer.allocate(1024);
- buf.put("AMQP_PROTOCOL_HEADER".getBytes());
- buf.flip();
-
- SegmentEncoder enc = new SegmentEncoder((byte) 0, (byte) 10,
- new Handler<ByteBuffer>()
- {
- public void handle(ByteBuffer frame)
- {
- System.out.println(frame);
- }
- },
- 16,
- (byte) 0x0,
- (byte) Frame.L1,
- (byte) Frame.METHOD,
- 0,
- 7 + buf.remaining());
- enc.put((byte)0);
- enc.put((byte)1);
- enc.put((byte)2);
- enc.put((byte)3);
- enc.put((byte)4);
- enc.put((byte)5);
- enc.put((byte)6);
- enc.put(buf);
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java b/java/common/src/main/java/org/apache/qpidity/transport/AbstractDelegate.java
index c658320989..a8059d669f 100644
--- a/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/AbstractDelegate.java
@@ -18,22 +18,23 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
/**
- * ProtocolActions
+ * AbstractDelegate
*
- * @author Rafael H. Schloming
*/
-interface ProtocolActions
+class AbstractDelegate<C>
{
- void init(ProtocolHeader header);
+ public void init(C context, ProtocolHeader header) {}
- void frame(Frame frame);
+ public void error(C context, ProtocolError error) {}
- void error(ProtocolError error);
+ public void header(C context, Header header) {}
+
+ public void data(C context, Data data) {}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
new file mode 100644
index 0000000000..426f954c17
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+import java.nio.ByteBuffer;
+
+import java.util.List;
+import java.util.ArrayList;
+
+import static org.apache.qpidity.transport.network.Frame.*;
+import static org.apache.qpidity.transport.util.Functions.*;
+
+
+/**
+ * Channel
+ *
+ * @author Rafael H. Schloming
+ */
+
+public class Channel extends Invoker implements Receiver<ProtocolEvent>
+{
+
+ final private Connection connection;
+ final private int channel;
+ final private Delegate<Channel> delegate;
+ final private SessionDelegate sessionDelegate;
+ // session may be null
+ private Session session;
+
+ private boolean first = true;
+ private ByteBuffer data = null;
+
+ public Channel(Connection connection, int channel, SessionDelegate delegate)
+ {
+ this.connection = connection;
+ this.channel = channel;
+ this.delegate = new ChannelDelegate();
+ this.sessionDelegate = delegate;
+ }
+
+ public Connection getConnection()
+ {
+ return connection;
+ }
+
+ public void received(ProtocolEvent event)
+ {
+ switch (event.getEncodedTrack())
+ {
+ case L1:
+ event.delegate(this, connection.getConnectionDelegate());
+ break;
+ case L2:
+ event.delegate(this, delegate);
+ break;
+ case L3:
+ event.delegate(session, sessionDelegate);
+ break;
+ case L4:
+ // XXX
+ if (event instanceof Method)
+ {
+ Method method = (Method) event;
+ method.setId(session.nextCommandId());
+ method.delegate(session, sessionDelegate);
+ if (!method.hasPayload())
+ {
+ session.processed(method);
+ }
+ }
+ else
+ {
+ event.delegate(session, sessionDelegate);
+ }
+ break;
+ default:
+ throw new IllegalStateException
+ ("unknown track: " + event.getEncodedTrack());
+ }
+ }
+
+ public void closed()
+ {
+ System.out.println("channel closed: " + this);
+ }
+
+ public void close()
+ {
+ connection.removeChannel(channel);
+ }
+
+ public int getEncodedChannel() {
+ return channel;
+ }
+
+ public Session getSession()
+ {
+ return session;
+ }
+
+ void setSession(Session session)
+ {
+ this.session = session;
+ }
+
+ private void emit(ProtocolEvent event)
+ {
+ connection.send(new ConnectionEvent(channel, event));
+ }
+
+ public void method(Method m)
+ {
+ emit(m);
+
+ if (m.getEncodedTrack() != L4)
+ {
+ System.out.println("sent control " + m.getClass().getName());
+ }
+ }
+
+ public void header(Header header)
+ {
+ emit(header);
+ }
+
+ public void data(ByteBuffer buf)
+ {
+ if (data != null)
+ {
+ emit(new Data(data, first, false));
+ first = false;
+ }
+
+ data = buf;
+ }
+
+ public void data(String str)
+ {
+ data(str.getBytes());
+ }
+
+ public void data(byte[] bytes)
+ {
+ data(ByteBuffer.wrap(bytes));
+ }
+
+ public void end()
+ {
+ emit(new Data(data, first, true));
+ first = true;
+ data = null;
+ }
+
+ protected void invoke(Method m)
+ {
+ method(m);
+ }
+
+ protected <T> Future<T> invoke(Method m, Class<T> cls)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java
index d486868621..9d28b1a81e 100644
--- a/java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
import java.util.UUID;
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Connection.java b/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
new file mode 100644
index 0000000000..2c68b7c38b
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * Connection
+ *
+ * @author Rafael H. Schloming
+ *
+ * @todo the channels map should probably be replaced with something
+ * more efficient, e.g. an array or a map implementation that can use
+ * short instead of Short
+ */
+
+// RA making this public until we sort out the package issues
+public class Connection
+ implements Receiver<ConnectionEvent>, Sender<ConnectionEvent>
+{
+
+ final private Sender<ConnectionEvent> sender;
+ final private ConnectionDelegate delegate;
+
+ final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
+
+ public Connection(Sender<ConnectionEvent> sender,
+ ConnectionDelegate delegate)
+ {
+ this.sender = sender;
+ this.delegate = delegate;
+ }
+
+ public ConnectionDelegate getConnectionDelegate()
+ {
+ return delegate;
+ }
+
+ public void received(ConnectionEvent event)
+ {
+ Channel channel = getChannel(event.getChannel());
+ channel.received(event.getProtocolEvent());
+ }
+
+ public void send(ConnectionEvent event)
+ {
+ sender.send(event);
+ }
+
+ public Channel getChannel(int number)
+ {
+ synchronized (channels)
+ {
+ Channel channel = channels.get(number);
+ if (channel == null)
+ {
+ channel = new Channel(this, number, delegate.getSessionDelegate());
+ channels.put(number, channel);
+ }
+ return channel;
+ }
+ }
+
+ void removeChannel(int number)
+ {
+ synchronized (channels)
+ {
+ channels.remove(number);
+ }
+ }
+
+ public void closed()
+ {
+ System.out.println("connection closed: " + this);
+ }
+
+ public void close()
+ {
+ sender.close();
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
index ff89567cee..d500cc6b81 100644
--- a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
@@ -18,7 +18,10 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
+
+import org.apache.qpidity.SecurityHelper;
+import org.apache.qpidity.QpidException;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
@@ -41,7 +44,7 @@ import javax.security.sasl.SaslServer;
/**
* Currently only implemented client specific methods
* the server specific methods are dummy impls for testing
- *
+ *
* the connectionClose is kind of different for both sides
*/
public abstract class ConnectionDelegate extends Delegate<Channel>
@@ -56,23 +59,47 @@ public abstract class ConnectionDelegate extends Delegate<Channel>
private int maxFrame = 64*1024;
private Condition _negotiationComplete;
private Lock _negotiationCompleteLock;
-
+
public abstract SessionDelegate getSessionDelegate();
-
+
public void setCondition(Lock negotiationCompleteLock,Condition negotiationComplete)
{
_negotiationComplete = negotiationComplete;
_negotiationCompleteLock = negotiationCompleteLock;
}
-
+
+ @Override public void init(Channel ch, ProtocolHeader hdr)
+ {
+ System.out.println(hdr);
+ // XXX: hardcoded version
+ if (hdr.getMajor() != 0 && hdr.getMinor() != 10)
+ {
+ // XXX
+ ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10)));
+ ch.getConnection().close();
+ }
+ else
+ {
+
+ System.out.println("\n--------------------Broker Start Connection Negotiation -----------------------\n");
+
+ ch.connectionStart(hdr.getMajor(), hdr.getMinor(), null, "PLAIN", "utf8");
+ }
+ }
+
+ @Override public void error(Channel ch, ProtocolError error)
+ {
+ throw new RuntimeException(error.getMessage());
+ }
+
// ----------------------------------------------
- // Client side
+ // Client side
//-----------------------------------------------
- @Override public void connectionStart(Channel context, ConnectionStart struct)
+ @Override public void connectionStart(Channel context, ConnectionStart struct)
{
System.out.println("\n--------------------Client Start Connection Negotiation -----------------------\n");
System.out.println("The broker has sent connection-start");
-
+
String mechanism = null;
String response = null;
try
@@ -94,15 +121,15 @@ public abstract class ConnectionDelegate extends Delegate<Channel>
{
// need error handling
}
-
- Map<String,?> props = new HashMap<String,String>();
+
+ Map<String,?> props = new HashMap<String,String>();
context.connectionStartOk(props, mechanism, response, _locale);
}
-
- @Override public void connectionSecure(Channel context, ConnectionSecure struct)
+
+ @Override public void connectionSecure(Channel context, ConnectionSecure struct)
{
System.out.println("The broker has sent connection-secure with chanllenge " + struct.getChallenge());
-
+
try
{
String response = new String(saslClient.evaluateChallenge(struct.getChallenge().getBytes()),_locale);
@@ -115,20 +142,20 @@ public abstract class ConnectionDelegate extends Delegate<Channel>
catch (SaslException e)
{
// need error handling
- }
+ }
}
-
- @Override public void connectionTune(Channel context, ConnectionTune struct)
+
+ @Override public void connectionTune(Channel context, ConnectionTune struct)
{
System.out.println("The broker has sent connection-tune " + struct.toString());
-
+
// should update the channel max given by the broker.
- context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat());
+ context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat());
context.connectionOpen(_virtualHost, null, Option.INSIST);
}
-
-
- @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct)
+
+
+ @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct)
{
String knownHosts = struct.getKnownHosts();
System.out.println("The broker has opened the connection for use");
@@ -147,23 +174,23 @@ public abstract class ConnectionDelegate extends Delegate<Channel>
}
System.out.println("\n-------------------- Client End Connection Negotiation -----------------------\n");
}
-
- public void connectionRedirect(Channel context, ConnectionRedirect struct)
+
+ public void connectionRedirect(Channel context, ConnectionRedirect struct)
{
// not going to bother at the moment
}
-
+
// ----------------------------------------------
- // Server side
+ // Server side
//-----------------------------------------------
- @Override public void connectionStartOk(Channel context, ConnectionStartOk struct)
+ @Override public void connectionStartOk(Channel context, ConnectionStartOk struct)
{
//set the client side locale on the server side
_locale = struct.getLocale();
_mechanism = struct.getMechanism();
-
+
System.out.println("The client has sent connection-start-ok");
-
+
//try
//{
//saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",null,SecurityHelper.createCallbackHandler(_mechanism,_username,_password));
@@ -183,11 +210,11 @@ public abstract class ConnectionDelegate extends Delegate<Channel>
}
catch(Exception e)
{
-
+
}
}
-
-
+
+
/*}
catch (SaslException e)
{
@@ -198,13 +225,13 @@ public abstract class ConnectionDelegate extends Delegate<Channel>
// need error handling
}*/
}
-
- @Override public void connectionTuneOk(Channel context, ConnectionTuneOk struct)
+
+ @Override public void connectionTuneOk(Channel context, ConnectionTuneOk struct)
{
System.out.println("The client has excepted the tune params");
}
-
- @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct)
+
+ @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct)
{
System.out.println("The client has sent connection-secure-ok");
try
@@ -225,11 +252,11 @@ public abstract class ConnectionDelegate extends Delegate<Channel>
}
catch(Exception e)
{
-
+
}
}
-
-
+
+
}
catch (SaslException e)
{
@@ -240,17 +267,16 @@ public abstract class ConnectionDelegate extends Delegate<Channel>
// need error handling
}
}
-
-
- @Override public void connectionOpen(Channel context, ConnectionOpen struct)
+
+
+ @Override public void connectionOpen(Channel context, ConnectionOpen struct)
{
String hosts = "amqp:1223243232325";
System.out.println("The client has sent connection-open");
context.connectionOpenOk(hosts);
System.out.println("\n-------------------- Broker End Connection Negotiation -----------------------\n");
}
-
-
+
public String getPassword()
{
return _password;
diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolError.java b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java
index a4a83fad35..6ed0df57a7 100644
--- a/java/common/src/main/java/org/apache/qpidity/ProtocolError.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java
@@ -18,30 +18,34 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
/**
- * ProtocolError
+ * ConnectionEvent
*
- * @author Rafael H. Schloming
*/
-class ProtocolError
+public class ConnectionEvent
{
- private final String format;
- private final Object[] args;
+ private final int channel;
+ private final ProtocolEvent event;
- public ProtocolError(String format, Object ... args)
+ public ConnectionEvent(int channel, ProtocolEvent event)
{
- this.format = format;
- this.args = args;
+ this.channel = channel;
+ this.event = event;
}
- public String getMessage()
+ public int getChannel()
{
- return String.format(format, args);
+ return channel;
+ }
+
+ public ProtocolEvent getProtocolEvent()
+ {
+ return event;
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Data.java b/java/common/src/main/java/org/apache/qpidity/transport/Data.java
new file mode 100644
index 0000000000..55cde84d5e
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Data.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+import org.apache.qpidity.transport.network.Frame;
+
+import java.nio.ByteBuffer;
+
+import java.util.Collections;
+
+
+/**
+ * Data
+ *
+ */
+
+public class Data implements ProtocolEvent
+{
+
+ private final Iterable<ByteBuffer> fragments;
+ private final boolean first;
+ private final boolean last;
+
+ public Data(Iterable<ByteBuffer> fragments, boolean first, boolean last)
+ {
+ this.fragments = fragments;
+ this.first = first;
+ this.last = last;
+ }
+
+ public Data(ByteBuffer buf, boolean first, boolean last)
+ {
+ this(Collections.singletonList(buf), first, last);
+ }
+
+ public Iterable<ByteBuffer> getFragments()
+ {
+ return fragments;
+ }
+
+ public boolean isFirst()
+ {
+ return first;
+ }
+
+ public boolean isLast()
+ {
+ return last;
+ }
+
+ public byte getEncodedTrack()
+ {
+ return Frame.L4;
+ }
+
+ public <C> void delegate(C context, Delegate<C> delegate)
+ {
+ delegate.data(context, this);
+ }
+
+ public <C> void delegate(C context, Switch sw)
+ {
+ sw.data(context, this);
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/Future.java b/java/common/src/main/java/org/apache/qpidity/transport/Future.java
index 8902446e95..8936f06831 100644
--- a/java/common/src/main/java/org/apache/qpidity/Future.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Future.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
/**
diff --git a/java/common/src/main/java/org/apache/qpidity/Switch.java b/java/common/src/main/java/org/apache/qpidity/transport/Header.java
index 166dc33134..632dc137c1 100644
--- a/java/common/src/main/java/org/apache/qpidity/Switch.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Header.java
@@ -18,41 +18,59 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
+
+import org.apache.qpidity.transport.network.Frame;
+
+import java.util.List;
-import java.util.HashMap;
-import java.util.Map;
/**
- * A Switch performs generic event dispatch.
+ * Header
*
* @author Rafael H. Schloming
*/
-abstract class Switch<K,E> implements Handler<E>
-{
+public class Header implements ProtocolEvent {
- final private Map<K,Handler<E>> handlers =
- new HashMap<K,Handler<E>>();
+ private final List<Struct> structs;
- public void map(K key, Handler<E> handler)
+ public Header(List<Struct> structs)
{
- handlers.put(key, handler);
+ this.structs = structs;
}
- public void handle(E event)
+ public List<Struct> getStructs()
{
- K key = resolve(event);
- Handler<E> handler = handlers.get(key);
- if (handler == null)
+ return structs;
+ }
+
+ public <T> T get(Class<T> klass)
+ {
+ for (Struct st : structs)
{
- throw new IllegalStateException("no such key: " + key +
- " this = " + this +
- " handlers = " + handlers);
+ if (klass.isInstance(st))
+ {
+ return klass.cast(st);
+ }
}
- handler.handle(event);
+
+ return null;
+ }
+
+ public byte getEncodedTrack()
+ {
+ return Frame.L4;
}
- abstract K resolve(E event);
+ public <C> void delegate(C context, Delegate<C> delegate)
+ {
+ delegate.header(context, this);
+ }
+
+ public <C> void delegate(C context, Switch sw)
+ {
+ sw.header(context, this);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Method.java b/java/common/src/main/java/org/apache/qpidity/transport/Method.java
index 43865d36aa..edd9116a73 100644
--- a/java/common/src/main/java/org/apache/qpidity/Method.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Method.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
/**
@@ -27,7 +27,7 @@ package org.apache.qpidity;
* @author Rafael H. Schloming
*/
-public abstract class Method extends Struct
+public abstract class Method extends Struct implements ProtocolEvent
{
public static final Method create(int type)
@@ -54,11 +54,9 @@ public abstract class Method extends Struct
public abstract byte getEncodedTrack();
- // XXX: do we need a segment base type?
- public byte getSegmentType()
+ public <C> void delegate(C context, Switch sw)
{
- // XXX
- return Frame.METHOD;
+ sw.method(context, this);
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Segment.java b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java
index ee9f60fad8..cd9fb3b94a 100644
--- a/java/common/src/main/java/org/apache/qpidity/Segment.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java
@@ -18,57 +18,55 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-
-import java.nio.ByteBuffer;
-
-import static org.apache.qpidity.Functions.*;
+import org.apache.qpidity.transport.network.NetworkDelegate;
+import org.apache.qpidity.transport.network.NetworkEvent;
/**
- * Segment
+ * ProtocolError
*
* @author Rafael H. Schloming
*/
-class Segment implements Iterable<ByteBuffer>
+public class ProtocolError implements NetworkEvent, ProtocolEvent
{
- private final Collection<ByteBuffer> fragments = new ArrayList<ByteBuffer>();
+ private final byte track;
+ private final String format;
+ private final Object[] args;
- public void add(ByteBuffer fragment)
+ public ProtocolError(byte track, String format, Object ... args)
{
- fragments.add(fragment);
+ this.track = track;
+ this.format = format;
+ this.args = args;
}
- public Iterator<ByteBuffer> getFragments()
+ public byte getEncodedTrack()
{
- return new SliceIterator(fragments.iterator());
+ return track;
}
- public Iterator<ByteBuffer> iterator()
+ public String getMessage()
{
- return getFragments();
+ return String.format(format, args);
}
- public String toString()
+ public <C> void delegate(C context, Switch sw)
{
- StringBuilder str = new StringBuilder();
- String sep = " | ";
-
- for (ByteBuffer buf : this)
- {
- str.append(str(buf));
- str.append(sep);
- }
+ sw.error(context, this);
+ }
- str.setLength(str.length() - sep.length());
+ public void delegate(NetworkDelegate delegate)
+ {
+ delegate.error(this);
+ }
- return str.toString();
+ public <C> void delegate(C context, Delegate<C> delegate)
+ {
+ delegate.error(context, this);
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java
index f5a040166e..e2adefba9e 100644
--- a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java
@@ -18,30 +18,32 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
/**
- * A MethodDispatcher parses and dispatches a method segment.
+ * ProtocolEvent
*
- * @author Rafael H. Schloming
*/
-class MethodDispatcher<C> implements Handler<Event<C,Method>>
+public interface ProtocolEvent
{
- final private Delegate<C> delegate;
-
- public MethodDispatcher(Delegate<C> delegate)
+ public interface Switch<C>
{
- this.delegate = delegate;
+ void init(C context, ProtocolHeader header);
+ void method(C context, Method method);
+ void header(C context, Header header);
+ void data(C context, Data data);
+ void error(C context, ProtocolError error);
}
- public void handle(Event<C,Method> event)
- {
- Method method = event.target;
- System.out.println("\nDelegating " + method.getClass().getName() + " to " + delegate.getClass().getName() + "\n");
- method.delegate(event.context, delegate);
- }
+ // XXX: could do this switching with cascading defaults for the
+ // specific dispatch methods
+ <C> void delegate(C context, Switch sw);
+
+ <C> void delegate(C context, Delegate<C> delegate);
+
+ byte getEncodedTrack();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java
index 140d5ecbe3..50cae51171 100644
--- a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java
@@ -18,10 +18,14 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
import java.nio.ByteBuffer;
+import org.apache.qpidity.transport.network.NetworkDelegate;
+import org.apache.qpidity.transport.network.NetworkEvent;
+import org.apache.qpidity.transport.network.Frame;
+
/**
* ProtocolHeader
@@ -31,7 +35,7 @@ import java.nio.ByteBuffer;
//RA making this public until we sort out the package issues
-public class ProtocolHeader
+public class ProtocolHeader implements NetworkEvent, ProtocolEvent
{
private static final byte[] AMQP = {'A', 'M', 'Q', 'P' };
@@ -48,6 +52,11 @@ public class ProtocolHeader
this.minor = minor;
}
+ public ProtocolHeader(int instance, int major, int minor)
+ {
+ this((byte) instance, (byte) major, (byte) minor);
+ }
+
public byte getInstance()
{
return instance;
@@ -63,6 +72,11 @@ public class ProtocolHeader
return minor;
}
+ public byte getEncodedTrack()
+ {
+ return Frame.L1;
+ }
+
public ByteBuffer toByteBuffer()
{
ByteBuffer buf = ByteBuffer.allocate(8);
@@ -75,6 +89,21 @@ public class ProtocolHeader
return buf;
}
+ public <C> void delegate(C context, Switch sw)
+ {
+ sw.init(context, this);
+ }
+
+ public void delegate(NetworkDelegate delegate)
+ {
+ delegate.init(this);
+ }
+
+ public <C> void delegate(C context, Delegate<C> delegate)
+ {
+ delegate.init(context, this);
+ }
+
public String toString()
{
return String.format("AMQP.%d %d-%d", instance, major, minor);
diff --git a/java/common/src/main/java/org/apache/qpidity/Range.java b/java/common/src/main/java/org/apache/qpidity/transport/Range.java
index 9da7112a6d..ed745bf5ec 100644
--- a/java/common/src/main/java/org/apache/qpidity/Range.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Range.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
import static java.lang.Math.*;
diff --git a/java/common/src/main/java/org/apache/qpidity/RangeSet.java b/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java
index 2e79ee8a72..dfaec3702c 100644
--- a/java/common/src/main/java/org/apache/qpidity/RangeSet.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
import java.util.Collection;
import java.util.Iterator;
diff --git a/java/common/src/main/java/org/apache/qpidity/Delegator.java b/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java
index 00e769e0f2..65edb3a6ec 100644
--- a/java/common/src/main/java/org/apache/qpidity/Delegator.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java
@@ -18,18 +18,19 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
/**
- * Delegator
+ * Receiver
*
- * @author Rafael H. Schloming
*/
-interface Delegator
+public interface Receiver<T>
{
- <C> void delegate(C context, Delegate<C> delegate);
+ void received(T msg);
+
+ void closed();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Result.java b/java/common/src/main/java/org/apache/qpidity/transport/Result.java
index 7fe6c869a4..2126a76a53 100644
--- a/java/common/src/main/java/org/apache/qpidity/Result.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Result.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
/**
diff --git a/java/common/src/main/java/org/apache/qpidity/Header.java b/java/common/src/main/java/org/apache/qpidity/transport/Sender.java
index 9b6373df19..6da8358bd6 100644
--- a/java/common/src/main/java/org/apache/qpidity/Header.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Sender.java
@@ -18,13 +18,19 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
/**
- * Header
+ * Sender
*
- * @author Rafael H. Schloming
*/
-public abstract class Header extends Struct {}
+public interface Sender<T>
+{
+
+ void send(T msg);
+
+ void close();
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java
index 6f0bd5c757..59e8daae31 100644
--- a/java/common/src/main/java/org/apache/qpidity/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java
@@ -18,10 +18,14 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
+
+import org.apache.qpidity.transport.network.Frame;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
@@ -42,12 +46,12 @@ public class Session extends Invoker
// completed incoming commands
private final RangeSet processed = new RangeSet();
private Range syncPoint = null;
-
+
// outgoing command count
private long commandsOut = 0;
private Map<Long,Method> commands = new HashMap<Long,Method>();
private long mark = 0;
-
+
public Map<Long,Method> getOutstandingCommands()
{
@@ -104,19 +108,19 @@ public class Session extends Invoker
}
void flushProcessed()
- {
+ {
for (Range r: processed)
{
- System.out.println("Completed Range [" + r.getLower() + "," + r.getUpper() +"]" );
+ System.out.println("Completed Range [" + r.getLower() + "," + r.getUpper() +"]" );
}
- System.out.println("Notifying peer with execution complete");
+ System.out.println("Notifying peer with execution complete");
executionComplete(0, processed);
}
void syncPoint()
{
System.out.println("===========Request received to sync==========================");
-
+
Range range = new Range(0, getCommandsIn() - 1);
boolean flush;
synchronized (processed)
@@ -154,8 +158,8 @@ public class Session extends Invoker
for (long id = lower; id <= upper; id++)
{
commands.remove(id);
- }
-
+ }
+
if (commands.isEmpty())
{
System.out.println("\n All outstanding commands are completed !!!! \n");
@@ -177,15 +181,25 @@ public class Session extends Invoker
synchronized (commands)
{
System.out.println("sent command " + m.getClass().getName() + " command Id" + commandsOut);
- commands.put(commandsOut++, m);
+ commands.put(commandsOut++, m);
}
}
channel.method(m);
}
- public void headers(Struct ... headers)
+ public void header(Header header)
+ {
+ channel.header(header);
+ }
+
+ public void header(List<Struct> structs)
+ {
+ header(new Header(structs));
+ }
+
+ public void header(Struct ... structs)
{
- channel.headers(headers);
+ header(Arrays.asList(structs));
}
public void data(ByteBuffer buf)
@@ -210,7 +224,7 @@ public class Session extends Invoker
public void sync()
{
- System.out.println("calling sync()");
+ System.out.println("calling sync()");
synchronized (commands)
{
if (!commands.isEmpty())
@@ -221,7 +235,7 @@ public class Session extends Invoker
while (!commands.isEmpty())
{
try {
- System.out.println("\n============sync() waiting for commmands to be completed ==============\n");
+ System.out.println("\n============sync() waiting for commmands to be completed ==============\n");
commands.wait();
System.out.println("\n============sync() got notified=========================================\n");
}
@@ -314,4 +328,10 @@ public class Session extends Invoker
}
+ public void close()
+ {
+ sessionClose();
+ channel.close();
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
index e6c107ced2..8b3c661075 100644
--- a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
/**
@@ -30,9 +30,7 @@ package org.apache.qpidity;
public abstract class SessionDelegate extends Delegate<Session>
{
- public abstract void headers(Session ssn, Struct ... headers);
-
- public abstract void data(Session ssn, Frame frame);
+ private static final Struct[] EMPTY_STRUCT_ARRAY = {};
@Override public void executionResult(Session ssn, ExecutionResult result)
{
@@ -47,7 +45,7 @@ public abstract class SessionDelegate extends Delegate<Session>
for (Range range : ranges)
{
System.out.println("completed command range: " + range.getLower() + " to " + range.getUpper());
- ssn.complete(range.getLower(), range.getUpper());
+ ssn.complete(range.getLower(), range.getUpper());
}
}
ssn.complete(excmp.getCumulativeExecutionMark());
diff --git a/java/common/src/main/java/org/apache/qpidity/Struct.java b/java/common/src/main/java/org/apache/qpidity/transport/Struct.java
index 50da4910ab..f6464780e7 100644
--- a/java/common/src/main/java/org/apache/qpidity/Struct.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Struct.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
import org.apache.qpidity.codec.Encodable;
@@ -29,7 +29,7 @@ import org.apache.qpidity.codec.Encodable;
* @author Rafael H. Schloming
*/
-public abstract class Struct implements Delegator, Encodable
+public abstract class Struct implements Encodable
{
public static Struct create(int type)
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
new file mode 100644
index 0000000000..e17b37bb36
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
@@ -0,0 +1,184 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.network;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.codec.FragmentDecoder;
+
+import org.apache.qpidity.transport.ConnectionEvent;
+import org.apache.qpidity.transport.Data;
+import org.apache.qpidity.transport.Header;
+import org.apache.qpidity.transport.Method;
+import org.apache.qpidity.transport.ProtocolError;
+import org.apache.qpidity.transport.ProtocolEvent;
+import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.Receiver;
+import org.apache.qpidity.transport.Struct;
+
+
+/**
+ * Assembler
+ *
+ */
+
+public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
+{
+
+ private final Receiver<ConnectionEvent> receiver;
+ private final byte major;
+ private final byte minor;
+ private final Map<Integer,List<ByteBuffer>> segments;
+
+ public Assembler(Receiver<ConnectionEvent> receiver, byte major, byte minor)
+ {
+ this.receiver = receiver;
+ this.major = major;
+ this.minor = minor;
+ segments = new HashMap<Integer,List<ByteBuffer>>();
+ }
+
+ private int segmentKey(Frame frame)
+ {
+ // XXX: can this overflow?
+ return (frame.getTrack() + 1) * frame.getChannel();
+ }
+
+ private List<ByteBuffer> getSegment(Frame frame)
+ {
+ return segments.get(segmentKey(frame));
+ }
+
+ private void setSegment(Frame frame, List<ByteBuffer> segment)
+ {
+ int key = segmentKey(frame);
+ if (segments.containsKey(key))
+ {
+ error(new ProtocolError(Frame.L2, "segment in progress: %s",
+ frame));
+ }
+ segments.put(segmentKey(frame), segment);
+ }
+
+ private void clearSegment(Frame frame)
+ {
+ segments.remove(segmentKey(frame));
+ }
+
+ private void emit(int channel, ProtocolEvent event)
+ {
+ receiver.received(new ConnectionEvent(channel, event));
+ }
+
+ private void emit(Frame frame, ProtocolEvent event)
+ {
+ emit(frame.getChannel(), event);
+ }
+
+ public void received(NetworkEvent event)
+ {
+ event.delegate(this);
+ }
+
+ public void closed()
+ {
+ this.receiver.closed();
+ }
+
+ public void init(ProtocolHeader header)
+ {
+ emit(0, header);
+ }
+
+ public void frame(Frame frame)
+ {
+ switch (frame.getType())
+ {
+ case Frame.BODY:
+ emit(frame, new Data(frame, frame.isFirstFrame(),
+ frame.isLastFrame()));
+ break;
+ default:
+ assemble(frame);
+ break;
+ }
+ }
+
+ public void error(ProtocolError error)
+ {
+ emit(0, error);
+ }
+
+ private void assemble(Frame frame)
+ {
+ List<ByteBuffer> segment;
+ if (frame.isFirstFrame())
+ {
+ segment = new ArrayList<ByteBuffer>();
+ setSegment(frame, segment);
+ }
+ else
+ {
+ segment = getSegment(frame);
+ }
+
+ for (ByteBuffer buf : frame)
+ {
+ segment.add(buf);
+ }
+
+ if (frame.isLastFrame())
+ {
+ clearSegment(frame);
+ emit(frame, decode(frame.getType(), segment));
+ }
+ }
+
+ private ProtocolEvent decode(byte type, List<ByteBuffer> segment)
+ {
+ FragmentDecoder dec =
+ new FragmentDecoder(major, minor, segment.iterator());
+
+ switch (type)
+ {
+ case Frame.METHOD:
+ int methodType = (int) dec.readLong();
+ Method method = Method.create(methodType);
+ method.read(dec, major, minor);
+ return method;
+ case Frame.HEADER:
+ List<Struct> structs = new ArrayList();
+ while (dec.hasRemaining())
+ {
+ structs.add(dec.readLongStruct());
+ }
+ return new Header(structs);
+ default:
+ throw new IllegalStateException("unknown frame type: " + type);
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
new file mode 100644
index 0000000000..73be9c3492
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.network;
+
+import org.apache.qpidity.codec.BBEncoder;
+import org.apache.qpidity.codec.SizeEncoder;
+
+import org.apache.qpidity.transport.ConnectionEvent;
+import org.apache.qpidity.transport.Data;
+import org.apache.qpidity.transport.Header;
+import org.apache.qpidity.transport.Method;
+import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.ProtocolError;
+import org.apache.qpidity.transport.ProtocolEvent;
+import org.apache.qpidity.transport.Sender;
+import org.apache.qpidity.transport.Struct;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.qpidity.transport.network.Frame.*;
+
+import static java.lang.Math.*;
+
+
+/**
+ * Disassembler
+ *
+ */
+
+public class Disassembler
+ implements Sender<ConnectionEvent>, ProtocolEvent.Switch<ConnectionEvent>
+{
+
+ private final Sender<NetworkEvent> sender;
+ private final int maxFrame;
+ private final byte major;
+ private final byte minor;
+
+ public Disassembler(Sender<NetworkEvent> sender, byte major, byte minor,
+ int maxFrame)
+ {
+ this.sender = sender;
+ this.major = major;
+ this.minor = minor;
+ this.maxFrame = maxFrame;
+ }
+
+ public void send(ConnectionEvent event)
+ {
+ event.getProtocolEvent().delegate(event, this);
+ }
+
+ public void close()
+ {
+ sender.close();
+ }
+
+ private void fragment(byte flags, byte type, ConnectionEvent event,
+ ByteBuffer buf, boolean first, boolean last)
+ {
+ while (buf.hasRemaining())
+ {
+ ByteBuffer slice = buf.slice();
+ slice.limit(min(maxFrame, slice.remaining()));
+ buf.position(buf.position() + slice.remaining());
+
+ byte newflags = flags;
+ if (first)
+ {
+ newflags |= FIRST_FRAME;
+ first = false;
+ }
+ if (last && !buf.hasRemaining())
+ {
+ newflags |= LAST_FRAME;
+ }
+
+ Frame frame = new Frame(newflags, type,
+ event.getProtocolEvent().getEncodedTrack(),
+ event.getChannel());
+ frame.addFragment(slice);
+ sender.send(frame);
+ }
+ }
+
+ public void init(ConnectionEvent event, ProtocolHeader header)
+ {
+ sender.send(header);
+ }
+
+ public void method(ConnectionEvent event, Method method)
+ {
+ SizeEncoder sizer = new SizeEncoder(major, minor);
+ sizer.writeLong(method.getEncodedType());
+ method.write(sizer, major, minor);
+ sizer.flush();
+ int size = sizer.getSize();
+
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ BBEncoder enc = new BBEncoder(major, minor, buf);
+ enc.writeLong(method.getEncodedType());
+ method.write(enc, major, minor);
+ enc.flush();
+ buf.flip();
+
+ byte flags = FIRST_SEG;
+
+ if (!method.hasPayload())
+ {
+ flags |= LAST_SEG;
+ }
+
+ fragment(flags, METHOD, event, buf, true, true);
+ }
+
+ public void header(ConnectionEvent event, Header header)
+ {
+ SizeEncoder sizer = new SizeEncoder(major, minor);
+ for (Struct st : header.getStructs())
+ {
+ sizer.writeLongStruct(st);
+ }
+
+ ByteBuffer buf = ByteBuffer.allocate(sizer.getSize());
+ BBEncoder enc = new BBEncoder(major, minor, buf);
+ for (Struct st : header.getStructs())
+ {
+ enc.writeLongStruct(st);
+ enc.flush();
+ }
+ buf.flip();
+
+ fragment((byte) 0x0, HEADER, event, buf, true, true);
+ }
+
+ public void data(ConnectionEvent event, Data data)
+ {
+ for (ByteBuffer buf : data.getFragments())
+ {
+ fragment(LAST_SEG, BODY, event, buf, data.isFirst(),
+ data.isLast());
+ }
+ }
+
+ public void error(ConnectionEvent event, ProtocolError error)
+ {
+ sender.send(error);
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/Frame.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
index 89e7579cb3..c36b03b104 100644
--- a/java/common/src/main/java/org/apache/qpidity/Frame.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
@@ -18,7 +18,9 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport.network;
+
+import org.apache.qpidity.transport.util.SliceIterator;
import java.nio.ByteBuffer;
@@ -26,7 +28,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
-import static org.apache.qpidity.Functions.*;
+import static org.apache.qpidity.transport.util.Functions.*;
/**
@@ -36,7 +38,7 @@ import static org.apache.qpidity.Functions.*;
*/
// RA: changed it to public until we sort the package issues
-public class Frame implements Iterable<ByteBuffer>
+public class Frame implements NetworkEvent, Iterable<ByteBuffer>
{
public static final int HEADER_SIZE = 12;
@@ -82,6 +84,11 @@ public class Frame implements Iterable<ByteBuffer>
size += fragment.remaining();
}
+ public byte getFlags()
+ {
+ return flags;
+ }
+
public int getChannel()
{
return channel;
@@ -137,6 +144,11 @@ public class Frame implements Iterable<ByteBuffer>
return getFragments();
}
+ public void delegate(NetworkDelegate delegate)
+ {
+ delegate.frame(this);
+ }
+
public String toString()
{
StringBuilder str = new StringBuilder();
diff --git a/java/common/src/main/java/org/apache/qpidity/InputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
index 78cb3d1b60..191f900c02 100644
--- a/java/common/src/main/java/org/apache/qpidity/InputHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
@@ -18,11 +18,15 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport.network;
import java.nio.ByteBuffer;
-import static org.apache.qpidity.InputHandler.State.*;
+import org.apache.qpidity.transport.ProtocolError;
+import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.Receiver;
+
+import static org.apache.qpidity.transport.network.InputHandler.State.*;
/**
@@ -31,7 +35,7 @@ import static org.apache.qpidity.InputHandler.State.*;
* @author Rafael H. Schloming
*/
-class InputHandler implements Handler<ByteBuffer>
+public class InputHandler implements Receiver<ByteBuffer>
{
public enum State
@@ -61,7 +65,7 @@ class InputHandler implements Handler<ByteBuffer>
ERROR;
}
- private final ProtocolActions actions;
+ private final Receiver<NetworkEvent> receiver;
private State state;
private byte instance;
@@ -75,35 +79,35 @@ class InputHandler implements Handler<ByteBuffer>
private int size;
private Frame frame;
- public InputHandler(ProtocolActions actions, State state)
+ public InputHandler(Receiver<NetworkEvent> receiver, State state)
{
- this.actions = actions;
+ this.receiver = receiver;
this.state = state;
}
- public InputHandler(ProtocolActions actions)
+ public InputHandler(Receiver<NetworkEvent> receiver)
{
- this(actions, PROTO_HDR);
+ this(receiver, PROTO_HDR);
}
private void init()
{
- actions.init(new ProtocolHeader(instance, major, minor));
+ receiver.received(new ProtocolHeader(instance, major, minor));
}
private void frame()
{
assert size == frame.getSize();
- actions.frame(frame);
+ receiver.received(frame);
frame = null;
}
private void error(String fmt, Object ... args)
{
- actions.error(new ProtocolError(fmt, args));
+ receiver.received(new ProtocolError(Frame.L1, fmt, args));
}
- public void handle(ByteBuffer buf)
+ public void received(ByteBuffer buf)
{
while (buf.hasRemaining())
{
@@ -227,4 +231,9 @@ class InputHandler implements Handler<ByteBuffer>
}
}
+ public void closed()
+ {
+ receiver.closed();
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java
index 526ef50211..48655edd0c 100644
--- a/java/common/src/main/java/org/apache/qpidity/ProtocolHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java
@@ -18,16 +18,19 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport.network;
+
+import org.apache.qpidity.transport.ProtocolError;
+import org.apache.qpidity.transport.ProtocolHeader;
/**
- * ProtocolHandler
+ * NetworkDelegate
*
* @author Rafael H. Schloming
*/
-interface ProtocolHandler
+public interface NetworkDelegate
{
void init(ProtocolHeader header);
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java b/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java
new file mode 100644
index 0000000000..080efee704
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.network;
+
+
+/**
+ * NetworkEvent
+ *
+ */
+
+public interface NetworkEvent
+{
+
+ void delegate(NetworkDelegate delegate);
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
new file mode 100644
index 0000000000..90bef36790
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.network;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.transport.ProtocolError;
+import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.Sender;
+
+import static org.apache.qpidity.transport.network.Frame.*;
+
+
+/**
+ * OutputHandler
+ *
+ */
+
+public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate
+{
+
+ private Sender<ByteBuffer> sender;
+ private Object lock = new Object();
+
+ public OutputHandler(Sender<ByteBuffer> sender)
+ {
+ this.sender = sender;
+ }
+
+ public void send(NetworkEvent event)
+ {
+ event.delegate(this);
+ }
+
+ public void close()
+ {
+ synchronized (lock)
+ {
+ sender.close();
+ }
+ }
+
+ public void init(ProtocolHeader header)
+ {
+ synchronized (lock)
+ {
+ sender.send(header.toByteBuffer());
+ }
+ }
+
+ public void frame(Frame frame)
+ {
+ ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE);
+ hdr.put(frame.getFlags());
+ hdr.put(frame.getType());
+ hdr.putShort((short) (frame.getSize() + HEADER_SIZE));
+ hdr.put(RESERVED);
+ hdr.put(frame.getTrack());
+ hdr.putShort((short) frame.getChannel());
+ hdr.put(RESERVED);
+ hdr.put(RESERVED);
+ hdr.put(RESERVED);
+ hdr.put(RESERVED);
+ hdr.flip();
+
+ synchronized (lock)
+ {
+ sender.send(hdr);
+ for (ByteBuffer buf : frame)
+ {
+ sender.send(buf);
+ }
+ }
+ }
+
+ public void error(ProtocolError error)
+ {
+ throw new IllegalStateException("XXX");
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/MinaHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java
index f255b56d0b..ac9dab615d 100644
--- a/java/common/src/main/java/org/apache/qpidity/MinaHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport.network.mina;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -28,7 +28,6 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.SimpleByteBufferAllocator;
@@ -36,6 +35,16 @@ import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.qpidity.transport.Connection;
+import org.apache.qpidity.transport.ConnectionDelegate;
+import org.apache.qpidity.transport.Receiver;
+import org.apache.qpidity.transport.Sender;
+
+import org.apache.qpidity.transport.network.Assembler;
+import org.apache.qpidity.transport.network.Disassembler;
+import org.apache.qpidity.transport.network.InputHandler;
+import org.apache.qpidity.transport.network.OutputHandler;
+
/**
* MinaHandler
@@ -57,9 +66,9 @@ public class MinaHandler implements IoHandler
public void messageReceived(IoSession ssn, Object obj)
{
- Connection conn = (Connection) ssn.getAttachment();
+ Attachment attachment = (Attachment) ssn.getAttachment();
ByteBuffer buf = (ByteBuffer) obj;
- conn.getInputHandler().handle(buf.buf());
+ attachment.receiver.received(buf.buf());
}
public void messageSent(IoSession ssn, Object obj)
@@ -80,16 +89,15 @@ public class MinaHandler implements IoHandler
public void sessionOpened(final IoSession ssn)
{
System.out.println("opened " + ssn);
- Connection conn = new Connection(new Handler<java.nio.ByteBuffer>()
- {
- public void handle(java.nio.ByteBuffer buf)
- {
- ssn.write(ByteBuffer.wrap(buf));
- }
- },
- delegate,
- state);
- ssn.setAttachment(conn);
+ // XXX: hardcoded version + max-frame
+ Connection conn = new Connection
+ (new Disassembler(new OutputHandler(new MinaSender(ssn)),
+ (byte)0, (byte)10, 64*1024),
+ delegate);
+ // XXX: hardcoded version
+ Receiver<java.nio.ByteBuffer> receiver =
+ new InputHandler(new Assembler(conn, (byte)0, (byte)10), state);
+ ssn.setAttachment(new Attachment(conn, receiver));
// XXX
synchronized (ssn)
{
@@ -100,21 +108,27 @@ public class MinaHandler implements IoHandler
public void sessionClosed(IoSession ssn)
{
System.out.println("closed " + ssn);
+ Attachment attachment = (Attachment) ssn.getAttachment();
+ attachment.receiver.closed();
ssn.setAttachment(null);
}
public void sessionIdle(IoSession ssn, IdleStatus status)
{
- System.out.println(status);
+ // do nothing
}
- public static final void main(String[] args) throws IOException
+ private class Attachment
{
- ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
- if (args[0].equals("accept")) {
- accept("0.0.0.0", 5672, SessionDelegateStub.source());
- } else if (args[0].equals("connect")) {
- connect("0.0.0.0", 5672, SessionDelegateStub.source());
+
+ Connection connection;
+ Receiver<java.nio.ByteBuffer> receiver;
+
+ Attachment(Connection connection,
+ Receiver<java.nio.ByteBuffer> receiver)
+ {
+ this.connection = connection;
+ this.receiver = receiver;
}
}
@@ -124,8 +138,7 @@ public class MinaHandler implements IoHandler
{
IoAcceptor acceptor = new SocketAcceptor();
acceptor.bind(new InetSocketAddress(host, port),
- new MinaHandler(delegate, InputHandler.State.PROTO_HDR));
-
+ new MinaHandler(delegate, InputHandler.State.PROTO_HDR));
}
public static final Connection connect(String host, int port,
@@ -134,7 +147,8 @@ public class MinaHandler implements IoHandler
MinaHandler handler = new MinaHandler(delegate,
InputHandler.State.FRAME_HDR);
SocketAddress addr = new InetSocketAddress(host, port);
- IoConnector connector = new SocketConnector();
+ SocketConnector connector = new SocketConnector();
+ connector.setWorkerTimeout(0);
ConnectFuture cf = connector.connect(addr, handler);
cf.join();
IoSession ssn = cf.getSession();
@@ -153,8 +167,8 @@ public class MinaHandler implements IoHandler
}
}
}
- Connection conn = (Connection) ssn.getAttachment();
- return conn;
+ Attachment attachment = (Attachment) ssn.getAttachment();
+ return attachment.connection;
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/BodyHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java
index 26e654d3a7..54e9ec28ef 100644
--- a/java/common/src/main/java/org/apache/qpidity/BodyHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java
@@ -18,29 +18,37 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport.network.mina;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+
+import org.apache.qpidity.transport.Sender;
/**
- * BodyHandler
+ * MinaSender
*
- * @author Rafael H. Schloming
*/
-class BodyHandler implements Handler<Event<Session,Frame>>
+public class MinaSender implements Sender<java.nio.ByteBuffer>
{
- private final SessionDelegate delegate;
+ private final IoSession session;
+
+ public MinaSender(IoSession session)
+ {
+ this.session = session;
+ }
- public BodyHandler(SessionDelegate delegate)
+ public void send(java.nio.ByteBuffer buf)
{
- this.delegate = delegate;
+ session.write(ByteBuffer.wrap(buf));
}
- public void handle(Event<Session,Frame> event)
+ public void close()
{
- System.out.println("got body frame: " + event.target);
- delegate.data(event.context, event.target);
+ session.close();
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Functions.java b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
index 1008966b01..fb1d4ccddf 100644
--- a/java/common/src/main/java/org/apache/qpidity/Functions.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport.util;
import java.nio.ByteBuffer;
diff --git a/java/common/src/main/java/org/apache/qpidity/SliceIterator.java b/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java
index 9b4a8f90f7..32392a3561 100644
--- a/java/common/src/main/java/org/apache/qpidity/SliceIterator.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport.util;
import java.nio.ByteBuffer;
@@ -31,7 +31,7 @@ import java.util.Iterator;
* @author Rafael H. Schloming
*/
-class SliceIterator implements Iterator<ByteBuffer>
+public class SliceIterator implements Iterator<ByteBuffer>
{
final private Iterator<ByteBuffer> iterator;
diff --git a/java/plugins/src/main/java/org/apache/qpid/plugins/JythonMojo.java b/java/plugins/src/main/java/org/apache/qpid/plugins/JythonMojo.java
index 1788f471d7..50af9c257e 100644
--- a/java/plugins/src/main/java/org/apache/qpid/plugins/JythonMojo.java
+++ b/java/plugins/src/main/java/org/apache/qpid/plugins/JythonMojo.java
@@ -21,6 +21,7 @@
package org.apache.qpid.plugins;
import java.io.File;
+import java.io.IOException;
import org.apache.maven.plugin.AbstractMojo;
import org.apache.maven.plugin.MojoExecutionException;
@@ -46,9 +47,44 @@ public class JythonMojo extends AbstractMojo
*/
private String[] params = new String[0];
+ /**
+ * Source file.
+ *
+ * @parameter
+ */
+ private File source;
+
+ /**
+ * Optional timestamp.
+ *
+ * @parameter
+ */
+ private File timestamp;
+
public void execute() throws MojoExecutionException
{
+ if (source != null && timestamp != null)
+ {
+ if (timestamp.lastModified() > source.lastModified())
+ {
+ return;
+ }
+ }
+
jython.main(params);
+
+ if (timestamp != null)
+ {
+ try
+ {
+ timestamp.createNewFile();
+ }
+ catch (IOException e)
+ {
+ throw new MojoExecutionException("cannot create timestamp", e);
+ }
+ timestamp.setLastModified(System.currentTimeMillis());
+ }
}
}