diff options
author | Rafael H. Schloming <rhs@apache.org> | 2007-09-13 21:42:57 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2007-09-13 21:42:57 +0000 |
commit | e10d11937bccc3cdbdd867266501c3e16d28e933 (patch) | |
tree | ee31690915cbb880ba553708ed11b9b607b23a0b | |
parent | 0a1b3430450f274aee273a9f792a2d43f771b85f (diff) | |
download | qpid-python-e10d11937bccc3cdbdd867266501c3e16d28e933.tar.gz |
* moved most of the classes in the org.apache.qpidity package to
org.apache.qpidity.transport
* factored out the network specific pieces into
org.apache.qpidity.transport
* moved the mina specific code to
org.apache.qpidity.transport.network.mina
* replaced the handler chain with Sender/Receiver chains that can
deal with close request/closed notifications
* moved from an anonymous struct[] to a real Header class
* removed an excess copy from message data transmit
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@575474 13f79535-47bb-0310-9956-ffa450edef68
84 files changed, 1511 insertions, 1639 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index aa6756d116..aaa724fd93 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -27,8 +27,8 @@ import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpidity.client.Session; import org.apache.qpidity.client.util.MessagePartListenerAdapter; import org.apache.qpidity.QpidException; -import org.apache.qpidity.RangeSet; -import org.apache.qpidity.Option; +import org.apache.qpidity.transport.RangeSet; +import org.apache.qpidity.transport.Option; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index ec27fdbb71..3bd5856df5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -28,7 +28,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; import org.apache.qpidity.api.Message; -import org.apache.qpidity.Struct; +import org.apache.qpidity.transport.Struct; import javax.jms.JMSException; import java.io.IOException; diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 8ecb5ffd78..f3fa79eb51 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -26,7 +26,7 @@ import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpidity.jms.ExceptionHelper; import org.apache.qpidity.client.util.ByteBufferMessage; -import org.apache.qpidity.ReplyTo; +import org.apache.qpidity.transport.ReplyTo; import javax.jms.Message; import javax.jms.JMSException; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index b115086d71..f5603b1695 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -27,9 +27,9 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpidity.Struct; -import org.apache.qpidity.MessageProperties; -import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.transport.Struct; +import org.apache.qpidity.transport.MessageProperties; +import org.apache.qpidity.transport.DeliveryProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java index 5c1ee713fc..6198f0504e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java @@ -27,7 +27,7 @@ import javax.jms.JMSException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpidity.Struct; +import org.apache.qpidity.transport.Struct; public interface MessageFactory diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index b60fc26fc0..13a2202e6f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -30,9 +30,9 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpidity.Struct; -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.transport.Struct; +import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java index 8d78f9f7fd..970ba5a66a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java @@ -25,8 +25,8 @@ import java.util.ArrayList; import java.util.List; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.Struct; +import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.Struct; /** * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and diff --git a/java/client/src/main/java/org/apache/qpidity/client/Client.java b/java/client/src/main/java/org/apache/qpidity/client/Client.java index fc89f2d368..3bc684a6ca 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Client.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Client.java @@ -6,16 +6,18 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.qpidity.BrokerDetails; -import org.apache.qpidity.Channel; -import org.apache.qpidity.Connection; -import org.apache.qpidity.ConnectionClose; -import org.apache.qpidity.ConnectionDelegate; import org.apache.qpidity.ErrorCode; -import org.apache.qpidity.MinaHandler; import org.apache.qpidity.QpidException; -import org.apache.qpidity.SessionDelegate; import org.apache.qpidity.client.impl.ClientSession; import org.apache.qpidity.client.impl.ClientSessionDelegate; +import org.apache.qpidity.transport.Channel; +import org.apache.qpidity.transport.Connection; +import org.apache.qpidity.transport.ConnectionClose; +import org.apache.qpidity.transport.ConnectionDelegate; +import org.apache.qpidity.transport.ConnectionEvent; +import org.apache.qpidity.transport.ProtocolHeader; +import org.apache.qpidity.transport.SessionDelegate; +import org.apache.qpidity.transport.network.mina.MinaHandler; import org.apache.qpidity.url.QpidURL; @@ -25,47 +27,48 @@ public class Client implements org.apache.qpidity.client.Connection private Connection _conn; private ExceptionListener _exceptionListner; private final Lock _lock = new ReentrantLock(); - + /** - * + * * @return returns a new connection to the broker. */ public static org.apache.qpidity.client.Connection createConnection() { return new Client(); } - + public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException { Condition negotiationComplete = _lock.newCondition(); _lock.lock(); - + ConnectionDelegate connectionDelegate = new ConnectionDelegate() - { + { public SessionDelegate getSessionDelegate() { return new ClientSessionDelegate(); } - - @Override public void connectionClose(Channel context, ConnectionClose connectionClose) + + @Override public void connectionClose(Channel context, ConnectionClose connectionClose) { _exceptionListner.onException( - new QpidException("Server closed the connection: Reason " + + new QpidException("Server closed the connection: Reason " + connectionClose.getReplyText(), ErrorCode.get(connectionClose.getReplyCode()), null)); } }; - + connectionDelegate.setCondition(_lock,negotiationComplete); connectionDelegate.setUsername(username); connectionDelegate.setPassword(password); connectionDelegate.setVirtualHost(virtualHost); - + _conn = MinaHandler.connect(host, port,connectionDelegate); - - _conn.getOutputHandler().handle(_conn.getHeader().toByteBuffer()); - + + // XXX: hardcoded version numbers + _conn.send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10))); + try { negotiationComplete.await(); @@ -79,7 +82,7 @@ public class Client implements org.apache.qpidity.client.Connection _lock.unlock(); } } - + /* * Until the dust settles with the URL disucssion * I am not going to implement this. @@ -94,9 +97,9 @@ public class Client implements org.apache.qpidity.client.Connection details.getUserName(), details.getPassword()); } - + public void close() throws QpidException - { + { Channel ch = _conn.getChannel(0); ch.connectionClose(0, "client is closing", 0, 0); //need to close the connection underneath as well @@ -104,7 +107,7 @@ public class Client implements org.apache.qpidity.client.Connection public Session createSession(long expiryInSeconds) { - Channel ch = _conn.getChannel(_channelNo.incrementAndGet()); + Channel ch = _conn.getChannel(_channelNo.incrementAndGet()); ClientSession ssn = new ClientSession(); ssn.attach(ch); ssn.sessionOpen(expiryInSeconds); @@ -116,10 +119,10 @@ public class Client implements org.apache.qpidity.client.Connection // TODO Auto-generated method stub return null; } - + public void setExceptionListener(ExceptionListener exceptionListner) { - _exceptionListner = exceptionListner; + _exceptionListner = exceptionListner; } } diff --git a/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java b/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java index 21532f7d46..bf6433af6a 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java +++ b/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java @@ -18,15 +18,15 @@ */ package org.apache.qpidity.client; -import org.apache.qpidity.DtxCoordinationCommitResult; -import org.apache.qpidity.DtxCoordinationGetTimeoutResult; -import org.apache.qpidity.DtxCoordinationPrepareResult; -import org.apache.qpidity.DtxCoordinationRecoverResult; -import org.apache.qpidity.DtxCoordinationRollbackResult; -import org.apache.qpidity.DtxDemarcationEndResult; -import org.apache.qpidity.DtxDemarcationStartResult; -import org.apache.qpidity.Future; -import org.apache.qpidity.Option; +import org.apache.qpidity.transport.DtxCoordinationCommitResult; +import org.apache.qpidity.transport.DtxCoordinationGetTimeoutResult; +import org.apache.qpidity.transport.DtxCoordinationPrepareResult; +import org.apache.qpidity.transport.DtxCoordinationRecoverResult; +import org.apache.qpidity.transport.DtxCoordinationRollbackResult; +import org.apache.qpidity.transport.DtxDemarcationEndResult; +import org.apache.qpidity.transport.DtxDemarcationStartResult; +import org.apache.qpidity.transport.Future; +import org.apache.qpidity.transport.Option; /** * This session�s resources are control under the scope of a distributed transaction. diff --git a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java index 4ccef6df55..f0c2c8ca9c 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java +++ b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java @@ -19,7 +19,7 @@ package org.apache.qpidity.client; import java.nio.ByteBuffer; -import org.apache.qpidity.Struct; +import org.apache.qpidity.transport.Header; /** * Assembles message parts. @@ -47,7 +47,7 @@ public interface MessagePartListener * * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code> */ - public void messageHeaders(Struct... headers); + public void messageHeader(Header header); /** * Add the following byte array to the content of the message being received diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index d8be937e46..2340d27882 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -22,9 +22,9 @@ import java.io.IOException; import java.nio.ByteBuffer;
import java.util.Map;
-import org.apache.qpidity.Option;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.Struct;
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.transport.Struct;
import org.apache.qpidity.api.Message;
/**
@@ -186,7 +186,7 @@ public interface Session * @see org.apache.qpidity.DeliveryProperties
* @see org.apache.qpidity.MessageProperties
*/
- public void headers(Struct... headers);
+ public void header(Struct... headers);
/**
* Add the following byte array to the content of the message being sent.
diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java index ea225976f2..2e53bdfcad 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java @@ -5,10 +5,10 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import org.apache.qpidity.Option; +import org.apache.qpidity.transport.Option; import org.apache.qpidity.QpidException; -import org.apache.qpidity.Range; -import org.apache.qpidity.RangeSet; +import org.apache.qpidity.transport.Range; +import org.apache.qpidity.transport.RangeSet; import org.apache.qpidity.api.Message; import org.apache.qpidity.client.ExceptionListener; import org.apache.qpidity.client.MessagePartListener; @@ -16,7 +16,7 @@ import org.apache.qpidity.client.MessagePartListener; /** * Implements a Qpid Sesion. */ -public class ClientSession extends org.apache.qpidity.Session implements org.apache.qpidity.client.Session +public class ClientSession extends org.apache.qpidity.transport.Session implements org.apache.qpidity.client.Session { private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>(); private ExceptionListener _exceptionListner; @@ -46,7 +46,7 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa // The javadoc clearly says that this method is suitable for small messages // therefore reading the content in one shot. super.messageTransfer(destination, confirmMode, acquireMode); - super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); + super.header(msg.getDeliveryProperties(),msg.getMessageProperties()); super.data(msg.readData()); super.endData(); } @@ -54,7 +54,7 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException { super.messageTransfer(destination, confirmMode, acquireMode); - super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); + super.header(msg.getDeliveryProperties(),msg.getMessageProperties()); boolean b = true; int count = 0; while(b) diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java index c4565d4544..1e003b23e6 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java @@ -3,18 +3,21 @@ package org.apache.qpidity.client.impl; import java.nio.ByteBuffer; import org.apache.qpidity.ErrorCode; -import org.apache.qpidity.Frame; -import org.apache.qpidity.MessageAcquired; -import org.apache.qpidity.MessageReject; -import org.apache.qpidity.MessageTransfer; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.Range; -import org.apache.qpidity.Session; -import org.apache.qpidity.SessionClosed; -import org.apache.qpidity.SessionDelegate; -import org.apache.qpidity.Struct; + import org.apache.qpidity.client.MessagePartListener; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.transport.Data; +import org.apache.qpidity.transport.Header; +import org.apache.qpidity.transport.MessageAcquired; +import org.apache.qpidity.transport.MessageReject; +import org.apache.qpidity.transport.MessageTransfer; +import org.apache.qpidity.transport.Range; +import org.apache.qpidity.transport.Session; +import org.apache.qpidity.transport.SessionClosed; +import org.apache.qpidity.transport.SessionDelegate; +import org.apache.qpidity.transport.Struct; + public class ClientSessionDelegate extends SessionDelegate { @@ -29,22 +32,22 @@ public class ClientSessionDelegate extends SessionDelegate // -------------------------------------------- // Message methods // -------------------------------------------- - @Override public void data(Session ssn, Frame frame) + @Override public void data(Session ssn, Data data) { - for (ByteBuffer b : frame) + for (ByteBuffer b : data.getFragments()) { _currentMessageListener.data(b); } - if (frame.isLastSegment() && frame.isLastFrame()) + if (data.isLast()) { _currentMessageListener.messageReceived(); } } - @Override public void headers(Session ssn, Struct... headers) + @Override public void header(Session ssn, Header header) { - _currentMessageListener.messageHeaders(headers); + _currentMessageListener.messageHeader(header); } diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java index 1f8e9610d1..c15ac8f6e5 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java @@ -1,7 +1,5 @@ package org.apache.qpidity.client.impl; -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; import org.apache.qpidity.QpidException; import org.apache.qpidity.api.Message; import org.apache.qpidity.client.Client; @@ -10,6 +8,8 @@ import org.apache.qpidity.client.ExceptionListener; import org.apache.qpidity.client.Session; import org.apache.qpidity.client.util.MessageListener; import org.apache.qpidity.client.util.MessagePartListenerAdapter; +import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageProperties; public class DemoClient { @@ -53,14 +53,14 @@ public class DemoClient // queue ssn.messageTransfer("amq.direct", (short) 0, (short) 1); - ssn.headers(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123")); + ssn.header(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123")); ssn.data("this is the data"); ssn.endData(); //reject ssn.messageTransfer("amq.direct", (short) 0, (short) 1); ssn.data("this should be rejected"); - ssn.headers(new DeliveryProperties().setRoutingKey("stocks")); + ssn.header(new DeliveryProperties().setRoutingKey("stocks")); ssn.endData(); ssn.sync(); @@ -81,7 +81,7 @@ public class DemoClient // topic ssn.messageTransfer("amq.topic", (short) 0, (short) 1); ssn.data("Topic message"); - ssn.headers(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456")); + ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456")); ssn.endData(); ssn.sync(); } diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java index acf5c283b2..4e90a82fff 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java @@ -2,8 +2,6 @@ package org.apache.qpidity.client.impl; import java.io.FileInputStream; -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; import org.apache.qpidity.QpidException; import org.apache.qpidity.api.Message; import org.apache.qpidity.client.Client; @@ -13,6 +11,8 @@ import org.apache.qpidity.client.Session; import org.apache.qpidity.client.util.FileMessage; import org.apache.qpidity.client.util.MessageListener; import org.apache.qpidity.client.util.MessagePartListenerAdapter; +import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageProperties; public class LargeMsgDemoClient { diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java index 218c6cd018..bf0889a913 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java @@ -5,8 +5,8 @@ import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.Queue; -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageProperties; import org.apache.qpidity.api.Message; /** diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java index 73e71ed84d..34fc057724 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java @@ -7,8 +7,8 @@ import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageProperties; import org.apache.qpidity.api.Message; /** diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java index e4f19ea6c3..c40a85863d 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java @@ -3,16 +3,16 @@ package org.apache.qpidity.client.util; import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; -import org.apache.qpidity.Struct; +import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageProperties; +import org.apache.qpidity.transport.Header; import org.apache.qpidity.client.MessagePartListener; -/** +/** * This is a simple message assembler. - * Will call onMessage method of the adaptee + * Will call onMessage method of the adaptee * when all message data is read. - * + * * This is a good convinience utility for handling * small messages */ @@ -20,17 +20,17 @@ public class MessagePartListenerAdapter implements MessagePartListener { MessageListener _adaptee; ByteBufferMessage _currentMsg; - + public MessagePartListenerAdapter(MessageListener listener) { - _adaptee = listener; + _adaptee = listener; } - + public void messageTransfer(long transferId) { _currentMsg = new ByteBufferMessage(transferId); } - + public void data(ByteBuffer src) { try @@ -40,28 +40,20 @@ public class MessagePartListenerAdapter implements MessagePartListener catch(IOException e) { // A chance for IO exception - // doesn't occur as we are using + // doesn't occur as we are using // a ByteBuffer } } - public void messageHeaders(Struct... headers) - { - for(Struct struct: headers) - { - if(struct instanceof DeliveryProperties) - { - _currentMsg.setDeliveryProperties((DeliveryProperties)struct); - } - else if (struct instanceof MessageProperties) - { - _currentMsg.setMessageProperties((MessageProperties)struct); - } - } - } - - public void messageReceived() - { + public void messageHeader(Header header) + { + _currentMsg.setDeliveryProperties(header.get(DeliveryProperties.class)); + _currentMsg.setMessageProperties(header.get(MessageProperties.class)); + } + + public void messageReceived() + { _adaptee.onMessage(_currentMsg); - } + } + } diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java index 2527856798..dfb65214d2 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java @@ -2,8 +2,8 @@ package org.apache.qpidity.client.util; import java.nio.ByteBuffer; -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageProperties; import org.apache.qpidity.api.Message; public abstract class ReadOnlyMessage implements Message diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java index 23089c7931..37e726d6c6 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java @@ -5,8 +5,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageProperties; import org.apache.qpidity.api.Message; public class StreamingMessage extends ReadOnlyMessage implements Message diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index a3c03ca6d0..8b6e694d25 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -26,9 +26,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; -import org.apache.qpidity.Option; import org.apache.qpidity.QpidException; -import org.apache.qpidity.RangeSet; import org.apache.qpidity.client.MessagePartListener; import org.apache.qpidity.client.util.MessagePartListenerAdapter; import org.apache.qpidity.exchange.ExchangeDefaults; @@ -36,6 +34,8 @@ import org.apache.qpidity.filter.JMSSelectorFilter; import org.apache.qpidity.filter.MessageFilter; import org.apache.qpidity.jms.message.MessageFactory; import org.apache.qpidity.jms.message.QpidMessage; +import org.apache.qpidity.transport.Option; +import org.apache.qpidity.transport.RangeSet; /** * Implementation of JMS message consumer diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java index 9ed74e1cd0..ec837bc4e6 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java @@ -18,7 +18,7 @@ package org.apache.qpidity.jms; import org.apache.qpidity.QpidException; -import org.apache.qpidity.Option; +import org.apache.qpidity.transport.Option; import org.apache.qpidity.url.BindingURL; import org.apache.qpidity.exchange.ExchangeDefaults; diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index 675a51625a..40bf7f15a8 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -21,7 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpidity.jms.message.*; import org.apache.qpidity.QpidException; -import org.apache.qpidity.RangeSet; +import org.apache.qpidity.transport.RangeSet; import javax.jms.*; import javax.jms.IllegalStateException; diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java index 22feb29598..d23b08e37a 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java @@ -18,8 +18,8 @@ package org.apache.qpidity.jms; import org.apache.qpidity.QpidException; -import org.apache.qpidity.Option; import org.apache.qpidity.exchange.ExchangeDefaults; +import org.apache.qpidity.transport.Option; import org.apache.qpidity.url.BindingURL; import javax.jms.Topic; diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java index b84b55966e..dab7585deb 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java @@ -21,8 +21,10 @@ import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; -import org.apache.qpidity.*; +import org.apache.qpidity.QpidException; import org.apache.qpidity.dtx.XidImpl; +import org.apache.qpidity.transport.*; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java index c11a7d8c3b..a7e4d02651 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java @@ -29,8 +29,8 @@ import java.io.IOException; import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; -import org.apache.qpidity.ReplyTo; import org.apache.qpidity.client.util.ByteBufferMessage; +import org.apache.qpidity.transport.ReplyTo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/common/generate b/java/common/generate index ac49c6283d..b2448c6e5d 100755 --- a/java/common/generate +++ b/java/common/generate @@ -8,6 +8,7 @@ from cStringIO import StringIO out_dir=sys.argv[1] out_pkg = sys.argv[2] spec_file = sys.argv[3] + spec = mllib.xml_parse(spec_file) class Output: @@ -27,6 +28,8 @@ class Output: self.line("import org.apache.qpidity.codec.Encodable;") self.line("import org.apache.qpidity.codec.Encoder;") self.line() + self.line("import org.apache.qpidity.transport.network.Frame;") + self.line() self.line() def line(self, l = ""): @@ -363,7 +366,7 @@ fct.line("}"); fct.write() dlg = Output(out_dir, out_pkg, "Delegate") -dlg.line("public abstract class Delegate<C> {") +dlg.line("public abstract class Delegate<C> extends AbstractDelegate<C> {") for s in structs: dlg.line(" public void %s(C context, %s struct) {}" % (dromedary(s.name), s.name)) diff --git a/java/common/pom.xml b/java/common/pom.xml index d480aecb1b..25f8550304 100644 --- a/java/common/pom.xml +++ b/java/common/pom.xml @@ -6,9 +6,9 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -87,9 +87,11 @@ <param>-Dpython.path=${basedir}/jython-lib.jar/Lib${path.separator}${mllib.dir}</param> <param>${basedir}/generate</param> <param>${generated.path}</param> - <param>org.apache.qpidity</param> + <param>org.apache.qpidity.transport</param> <param>${specs.dir}/amqp.0-10-preview.xml</param> </params> + <source>${specs.dir}/amqp.0-10-preview.xml</source> + <timestamp>${generated.path}/generated.timestamp</timestamp> </configuration> <goals> <goal>jython</goal> @@ -97,8 +99,8 @@ </execution> </executions> </plugin> - - <!-- Generates message selector grammar --> + + <!-- Generates message selector grammar --> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>javacc-maven-plugin</artifactId> @@ -145,13 +147,13 @@ </executions> </plugin> - + </plugins> </build> <dependencies> - <dependency> + <dependency> <groupId>commons-configuration</groupId> <artifactId>commons-configuration</artifactId> </dependency> @@ -166,14 +168,14 @@ <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> - <version>1.4.0</version> + <version>1.4.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.4.0</version> - <scope>test</scope> + <scope>test</scope> </dependency> <dependency> @@ -204,7 +206,7 @@ <scope>provided</scope> </dependency> <!--- This is used by filter --> - <dependency> + <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> </dependency> diff --git a/java/common/src/main/java/org/apache/qpidity/Channel.java b/java/common/src/main/java/org/apache/qpidity/Channel.java deleted file mode 100644 index 3c734ba8f4..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/Channel.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.nio.ByteBuffer; - -import java.util.List; -import java.util.ArrayList; - -import org.apache.qpidity.codec.SegmentEncoder; -import org.apache.qpidity.codec.SizeEncoder; - -import static org.apache.qpidity.Frame.*; -import static org.apache.qpidity.Functions.*; - - -/** - * Channel - * - * @author Rafael H. Schloming - */ - -public class Channel extends Invoker implements Handler<Frame> -{ - - final private Connection connection; - final private int channel; - final private TrackSwitch<Channel> tracks; - final private Delegate<Channel> delegate; - final private SessionDelegate sessionDelegate; - // session may be null - private Session session; - - private Method method = null; - private List<ByteBuffer> data = null; - private int dataSize; - - public Channel(Connection connection, int channel, SessionDelegate delegate) - { - this.connection = connection; - this.channel = channel; - this.delegate = new ChannelDelegate(); - this.sessionDelegate = delegate; - - tracks = new TrackSwitch<Channel>(); - tracks.map(L1, new MethodHandler<Channel> - (getMajor(), getMinor(), connection.getConnectionDelegate())); - tracks.map(L2, new MethodHandler<Channel> - (getMajor(), getMinor(), this.delegate)); - tracks.map(L3, new SessionResolver<Frame> - (new MethodHandler<Session> - (getMajor(), getMinor(), delegate))); - tracks.map(L4, new SessionResolver<Frame> - (new ContentHandler(getMajor(), getMinor(), delegate))); - } - - public byte getMajor() - { - return connection.getMajor(); - } - - public byte getMinor() - { - return connection.getMinor(); - } - - public int getEncodedChannel() { - return channel; - } - - public Session getSession() - { - return session; - } - - void setSession(Session session) - { - this.session = session; - } - - public void handle(Frame frame) - { - tracks.handle(new Event<Channel,Frame>(this, frame)); - } - - private SegmentEncoder newEncoder(byte flags, byte track, byte type, int size) - { - return new SegmentEncoder(getMajor(), - getMinor(), - connection.getOutputHandler(), - connection.getMaxFrame(), - (byte) (flags | VERSION), - track, - type, - channel, - size); - } - - public void method(Method m) - { - SizeEncoder sizer = new SizeEncoder(getMajor(), getMinor()); - sizer.writeLong(m.getEncodedType()); - m.write(sizer, getMajor(), getMinor()); - sizer.flush(); - int size = sizer.getSize(); - - byte flags = FIRST_SEG; - - if (!m.hasPayload()) - { - flags |= LAST_SEG; - } - - SegmentEncoder enc = newEncoder(flags, m.getEncodedTrack(), - m.getSegmentType(), size); - enc.writeLong(m.getEncodedType()); - m.write(enc, getMajor(), getMinor()); - enc.flush(); - - if (m.hasPayload()) - { - method = m; - } - - if (m.getEncodedTrack() != Frame.L4) - { - System.out.println("sent control " + m.getClass().getName()); - } - } - - public void headers(Struct ... headers) - { - if (method == null) - { - throw new IllegalStateException("cannot write headers without method"); - } - - SizeEncoder sizer = new SizeEncoder(getMajor(), getMinor()); - for (Struct hdr : headers) - { - sizer.writeLongStruct(hdr); - } - - SegmentEncoder enc = newEncoder((byte) 0x0, - method.getEncodedTrack(), - HEADER, - sizer.getSize()); - for (Struct hdr : headers) - { - enc.writeLongStruct(hdr); - enc.flush(); - System.out.println("sent " + hdr); - } - } - - public void data(ByteBuffer buf) - { - if (data == null) - { - data = new ArrayList<ByteBuffer>(); - dataSize = 0; - } - data.add(buf); - dataSize += buf.remaining(); - } - - public void data(String str) - { - data(str.getBytes()); - } - - public void data(byte[] bytes) - { - data(ByteBuffer.wrap(bytes)); - } - - public void end() - { - byte flags = LAST_SEG; - SegmentEncoder enc = newEncoder(flags, method.getEncodedTrack(), - BODY, dataSize); - for (ByteBuffer buf : data) - { - enc.put(buf); - System.out.println("sent " + str(buf)); - } - enc.flush(); - data = null; - dataSize = 0; - } - - protected void invoke(Method m) - { - method(m); - } - - protected <T> Future<T> invoke(Method m, Class<T> cls) - { - throw new UnsupportedOperationException(); - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java b/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java deleted file mode 100644 index c6190dc3d7..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * CommandDispatcher - * - * @author Rafael H. Schloming - */ - -class CommandDispatcher implements Handler<Event<Session,Method>> -{ - - private final Delegate<Session> delegate; - - public CommandDispatcher(Delegate<Session> delegate) - { - this.delegate = delegate; - } - - public void handle(Event<Session,Method> event) - { - Session ssn = event.context; - Method method = event.target; - method.setId(ssn.nextCommandId()); - System.out.println("\n Delegating " + method.getClass().getName() + "[" + method.getId() + "] to " + delegate.getClass().getName() + "\n"); - method.delegate(ssn, delegate); - if (!method.hasPayload()) - { - ssn.processed(method); - } - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/Connection.java b/java/common/src/main/java/org/apache/qpidity/Connection.java deleted file mode 100644 index b70c8fae18..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/Connection.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.util.HashMap; -import java.util.Map; - -import java.nio.ByteBuffer; - - -/** - * Connection - * - * @author Rafael H. Schloming - * - * @todo the channels map should probably be replaced with something - * more efficient, e.g. an array or a map implementation that can use - * short instead of Short - */ - -// RA making this public until we sort out the package issues -public class Connection implements ProtocolActions -{ - - final private Handler<ByteBuffer> input; - final private Handler<ByteBuffer> output; - final private ConnectionDelegate delegate; - - final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>(); - // XXX: hardcoded versions - private ProtocolHeader header = new ProtocolHeader((byte) 1, (byte) 0, (byte) 10); - // XXX - private int maxFrame = 64*1024; - - public Connection(Handler<ByteBuffer> output, - ConnectionDelegate delegate, - InputHandler.State state) - { - this.input = new InputHandler(this, state); - this.output = output; - this.delegate = delegate; - } - - public ConnectionDelegate getConnectionDelegate() - { - return delegate; - } - - public Connection(Handler<ByteBuffer> output, - ConnectionDelegate delegate) - { - this(output, delegate, InputHandler.State.PROTO_HDR); - } - - public Handler<ByteBuffer> getInputHandler() - { - return input; - } - - public Handler<ByteBuffer> getOutputHandler() - { - return output; - } - - public ProtocolHeader getHeader() - { - return header; - } - - public byte getMajor() - { - return header.getMajor(); - } - - public byte getMinor() - { - return header.getMinor(); - } - - public int getMaxFrame() - { - return maxFrame; - } - - public void init(ProtocolHeader hdr) - { - System.out.println(header); - if (hdr.getMajor() != header.getMajor() && - hdr.getMinor() != header.getMinor()) - { - output.handle(header.toByteBuffer()); - // XXX: how do we close the connection? - } - - // not sure if this is the right place - System.out.println("\n--------------------Broker Start Connection Negotiation -----------------------\n"); - - getChannel(0).connectionStart(header.getMajor(), header.getMinor(), null, "PLAIN", "utf8"); - } - - public Channel getChannel(int number) - { - Channel channel = channels.get(number); - if (channel == null) - { - channel = new Channel(this, number, delegate.getSessionDelegate()); - channels.put(number, channel); - } - return channel; - } - - public void frame(Frame frame) - { - Channel channel = getChannel(frame.getChannel()); - channel.handle(frame); - } - - public void error(ProtocolError error) - { - throw new RuntimeException(error.getMessage()); - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java b/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java index fa35e9332f..4e05aa574c 100644 --- a/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java +++ b/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java @@ -22,7 +22,9 @@ package org.apache.qpidity; import java.nio.ByteBuffer; -import static org.apache.qpidity.Functions.*; +import org.apache.qpidity.transport.Sender; + +import static org.apache.qpidity.transport.util.Functions.*; /** @@ -31,12 +33,17 @@ import static org.apache.qpidity.Functions.*; * @author Rafael H. Schloming */ -class ConsoleOutput implements Handler<ByteBuffer> +public class ConsoleOutput implements Sender<ByteBuffer> { - public void handle(ByteBuffer buf) + public void send(ByteBuffer buf) { System.out.println(str(buf)); } + public void close() + { + System.out.println("CLOSED"); + } + } diff --git a/java/common/src/main/java/org/apache/qpidity/ContentHandler.java b/java/common/src/main/java/org/apache/qpidity/ContentHandler.java deleted file mode 100644 index b54e28c85e..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/ContentHandler.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * ContentHandler is a stateful handler that aggregates and dispatches - * method and header frames, and passes body frames through to another - * handler. - * - * @author Rafael H. Schloming - */ - -class ContentHandler extends TypeSwitch<Session> -{ - - public ContentHandler(byte major, byte minor, SessionDelegate delegate) - { - CommandDispatcher disp = new CommandDispatcher(delegate); - MethodDecoder<Session> dec = new MethodDecoder<Session>(major, minor, disp); - HeaderHandler hh = new HeaderHandler(major, minor, delegate); - map(Frame.METHOD, new SegmentAssembler<Session>(dec)); - map(Frame.HEADER, new SegmentAssembler<Session>(hh)); - map(Frame.BODY, new BodyHandler(delegate)); - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/Event.java b/java/common/src/main/java/org/apache/qpidity/Event.java deleted file mode 100644 index ae210f4c20..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/Event.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * Events are a common class of thing to handle. An event has a - * context and a target. This division permits the same target - * instance to be used in a variety of contexts. - * - * @author Rafael H. Schloming - */ - -public class Event<C, T> -{ - - C context; - T target; - - public Event(C context, T target) - { - this.context = context; - this.target = target; - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/Handler.java b/java/common/src/main/java/org/apache/qpidity/Handler.java deleted file mode 100644 index 0ff949cc1d..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/Handler.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -/** - * Handler is a basic interface used throughout this library for - * callbacks, listeners, event handlers, etc. - * - * @author Rafael H. Schloming - */ - -public interface Handler<E> -{ - - void handle(E event); - -} diff --git a/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java b/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java deleted file mode 100644 index 8737932712..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.nio.ByteBuffer; - -import java.util.ArrayList; -import java.util.Iterator; - -import org.apache.qpidity.codec.FragmentDecoder; - - -/** - * HeaderHandler - * - * @author Rafael H. Schloming - */ - -class HeaderHandler implements Handler<Event<Session,Segment>> -{ - - private static final Struct[] EMPTY_STRUCT_ARRAY = {}; - - private final byte major; - private final byte minor; - private final SessionDelegate delegate; - - public HeaderHandler(byte major, byte minor, SessionDelegate delegate) - { - this.major = major; - this.minor = minor; - this.delegate = delegate; - } - - public void handle(Event<Session,Segment> event) - { - System.out.println("got header segment:\n " + event.target); - Iterator<ByteBuffer> fragments = event.target.getFragments(); - FragmentDecoder dec = new FragmentDecoder(major, minor, fragments); - ArrayList<Struct> headers = new ArrayList(); - while (dec.hasRemaining()) - { - headers.add(dec.readLongStruct()); - } - delegate.headers(event.context, headers.toArray(EMPTY_STRUCT_ARRAY)); - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java b/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java deleted file mode 100644 index b01f067381..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.nio.ByteBuffer; - -import java.util.Iterator; - -import org.apache.qpidity.codec.Decoder; -import org.apache.qpidity.codec.FragmentDecoder; - - -/** - * MethodDecoder - * - * @author Rafael H. Schloming - */ - -class MethodDecoder<C> implements Handler<Event<C,Segment>> -{ - - private final byte major; - private final byte minor; - private final Handler<Event<C,Method>> handler; - - public MethodDecoder(byte major, byte minor, Handler<Event<C,Method>> handler) - { - this.major = major; - this.minor = minor; - this.handler = handler; - } - - public void handle(Event<C,Segment> event) - { - //System.out.println("got method segment:\n " + event.target); - Iterator<ByteBuffer> fragments = event.target.getFragments(); - Decoder dec = new FragmentDecoder(major, minor, fragments); - int type = (int) dec.readLong(); - Method method = Method.create(type); - method.read(dec, major, minor); - handler.handle(new Event<C,Method>(event.context, method)); - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/MethodHandler.java b/java/common/src/main/java/org/apache/qpidity/MethodHandler.java deleted file mode 100644 index 86dac241a2..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/MethodHandler.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * MethodHandler is a stateful handler that aggregates frames into - * method segments and dispatches the resulting method. It does not - * accept any segment type other than Frame.METHOD. - * - * @author Rafael H. Schloming - */ - -class MethodHandler<C> extends TypeSwitch<C> -{ - - public MethodHandler(byte major, byte minor, Delegate<C> delegate) - { - MethodDispatcher disp = new MethodDispatcher<C>(delegate); - MethodDecoder<C> dec = new MethodDecoder<C>(major, minor, disp); - map(Frame.METHOD, new SegmentAssembler<C>(dec)); - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java b/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java deleted file mode 100644 index 0fb90a2f53..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.nio.ByteBuffer; - - -/** - * SegmentAssembler is a stateful handler that aggregates Frame events - * into Segment events. This should only be used where it is necessary - * to assemble a Segment before processing, e.g. for Method and Header - * segments. - * - * @author Rafael H. Schloming - */ - -class SegmentAssembler<C> implements Handler<Event<C,Frame>> -{ - - final private Handler<Event<C,Segment>> handler; - private Segment segment; - - public SegmentAssembler(Handler<Event<C,Segment>> handler) - { - this.handler = handler; - } - - public void handle(Event<C, Frame> event) - { - Frame frame = event.target; - if (frame.isFirstFrame()) - { - segment = new Segment(); - } - - for (ByteBuffer fragment : frame) - { - segment.add(fragment); - } - - if (frame.isLastFrame()) - { - handler.handle(new Event<C, Segment>(event.context, segment)); - } - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/SessionResolver.java b/java/common/src/main/java/org/apache/qpidity/SessionResolver.java deleted file mode 100644 index 24e52839fd..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/SessionResolver.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * SessionResolver is a stateless handler that accepts incoming events - * whose context is a Channel, and produces an event whose context is - * a Session. - * - * @author Rafael H. Schloming - */ - -class SessionResolver<T> implements Handler<Event<Channel,T>> -{ - - final private Handler<Event<Session,T>> handler; - - public SessionResolver(Handler<Event<Session,T>> handler) - { - this.handler = handler; - } - - public void handle(Event<Channel,T> event) - { - handler.handle(new Event<Session,T>(event.context.getSession(), event.target)); - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/Stub.java b/java/common/src/main/java/org/apache/qpidity/Stub.java deleted file mode 100644 index 7aa5030672..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/Stub.java +++ /dev/null @@ -1,109 +0,0 @@ -package org.apache.qpidity; - -import java.util.*; -import java.lang.annotation.*; - -import java.nio.ByteBuffer; - -import org.apache.qpidity.codec.BBEncoder; -import org.apache.qpidity.codec.Encoder; -import org.apache.qpidity.codec.SizeEncoder; - -import static org.apache.qpidity.Option.*; - - -public class Stub { - - private static final byte major = 0; - private static final byte minor = 10; - - private static Connection conn = new Connection(new ConsoleOutput(), - SessionDelegateStub.source()); - - static - { - conn.init(new ProtocolHeader((byte) 1, major, minor)); - } - - private static void frame(byte track, byte type, boolean first, boolean last) { - frame(track, type, first, last, null); - } - - private static void frame(byte track, byte type, boolean first, boolean last, Method m) { - SizeEncoder sizer = new SizeEncoder(major, minor); - if (m != null) { - sizer.writeLong(m.getEncodedType()); - m.write(sizer, major, minor); - sizer.flush(); - } - ByteBuffer buf = ByteBuffer.allocate(sizer.getSize()); - if (m != null) { - Encoder enc = new BBEncoder(major, minor, buf); - enc.writeLong(m.getEncodedType()); - m.write(enc, major, minor); - enc.flush(); - } - buf.flip(); - byte flags = Frame.VERSION; - if (first) { flags |= Frame.FIRST_FRAME; } - if (last) { flags |= Frame.LAST_FRAME; } - Frame frame = new Frame(flags, type, track, 0); - frame.addFragment(buf); - conn.frame(frame); - } - - public static final void main(String[] args) { - frame(Frame.L2, Frame.METHOD, true, true, new SessionOpen(0)); - frame(Frame.L4, Frame.METHOD, true, false, - new QueueDeclare("asdf", "alternate", null, DURABLE)); - frame(Frame.L4, Frame.METHOD, false, false); - frame(Frame.L3, Frame.METHOD, true, true, - new ExchangeDeclare("exchange", "type", "alternate", null)); - frame(Frame.L4, Frame.METHOD, false, true); - frame(Frame.L4, Frame.HEADER, true, false); - frame(Frame.L4, Frame.HEADER, false, false); - frame(Frame.L4, Frame.HEADER, false, true); - frame(Frame.L4, Frame.BODY, true, false); - frame(Frame.L4, Frame.BODY, false, false); - frame(Frame.L4, Frame.BODY, false, false); - frame(Frame.L1, Frame.METHOD, true, true, - new ExchangeDeclare("exchange", "type", "alternate", null)); - frame(Frame.L4, Frame.BODY, false, false); - frame(Frame.L4, Frame.BODY, false, true); - } - -} - -class SessionDelegateStub extends SessionDelegate { - - public static final ConnectionDelegate source() - { - return new ConnectionDelegate() - { - public SessionDelegate getSessionDelegate() - { - return new SessionDelegateStub(); - } - }; - } - - public @Override void queueDeclare(Session session, QueueDeclare qd) { - System.out.println("got a queue declare: " + qd.getQueue()); - } - - public @Override void exchangeDeclare(Session session, ExchangeDeclare ed) { - System.out.println("got an exchange declare: " + ed.getExchange() + ", " + ed.getType()); - session.queueDeclare("asdf", "alternate", null); - } - - public void data(Session ssn, Frame frame) - { - System.out.println("got data: " + frame); - } - - public void headers(Session ssn, Struct ... headers) - { - System.out.println("got headers: " + headers); - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index 0a48a0a990..f69b72227c 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -20,7 +20,10 @@ */ package org.apache.qpidity; -import static org.apache.qpidity.Functions.str; +import org.apache.qpidity.transport.*; +import org.apache.qpidity.transport.network.mina.MinaHandler; + +import static org.apache.qpidity.transport.util.Functions.str; import java.io.IOException; import java.nio.ByteBuffer; @@ -44,10 +47,10 @@ class ToyBroker extends SessionDelegate private ToyExchange exchange; private MessageTransfer xfr = null; private DeliveryProperties props = null; - private Struct[] headers = null; - private List<Frame> frames = null; + private Header header = null; + private List<Data> body = null; private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>(); - + public ToyBroker(ToyExchange exchange) { this.exchange = exchange; @@ -58,7 +61,7 @@ class ToyBroker extends SessionDelegate exchange.createQueue(qd.getQueue()); System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n"); } - + @Override public void queueBind(Session ssn, QueueBind qb) { exchange.bindQueue(qb.getExchange(), qb.getRoutingKey(),qb.getQueue()); @@ -70,22 +73,22 @@ class ToyBroker extends SessionDelegate QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue()); ssn.executionResult(qq.getId(), result); } - + @Override public void messageSubscribe(Session ssn, MessageSubscribe ms) { Consumer c = new Consumer(); c._queueName = ms.getQueue(); consumers.put(ms.getDestination(),c); - System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n"); - } - + System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n"); + } + @Override public void messageFlow(Session ssn,MessageFlow struct) { Consumer c = consumers.get(struct.getDestination()); c._credit = struct.getValue(); System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n"); } - + @Override public void messageFlush(Session ssn,MessageFlush struct) { System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n"); @@ -95,47 +98,44 @@ class ToyBroker extends SessionDelegate @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { this.xfr = xfr; - frames = new ArrayList<Frame>(); + body = new ArrayList<Data>(); System.out.println("received transfer " + xfr.getDestination()); } - public void headers(Session ssn, Struct ... headers) + @Override public void header(Session ssn, Header header) { - if (xfr == null || frames == null) + if (xfr == null || body == null) { ssn.connectionClose(503, "no method segment", 0, 0); - // XXX: close at our end + ssn.close(); return; } - for (Struct hdr : headers) + props = header.get(DeliveryProperties.class); + if (props != null) { - if (hdr instanceof DeliveryProperties) - { - props = (DeliveryProperties) hdr; - System.out.println("received headers routing_key " + props.getRoutingKey()); - } + System.out.println("received headers routing_key " + props.getRoutingKey()); } - - this.headers = headers; + + this.header = header; } - public void data(Session ssn, Frame frame) + @Override public void data(Session ssn, Data data) { - if (xfr == null || frames == null) + if (xfr == null || body == null) { ssn.connectionClose(503, "no method segment", 0, 0); - // XXX: close at our end + ssn.close(); return; } - frames.add(frame); + body.add(data); - if (frame.isLastSegment() && frame.isLastFrame()) + if (data.isLast()) { String dest = xfr.getDestination(); - Message m = new Message(headers, frames); - + Message m = new Message(header, body); + if (exchange.route(dest,props.getRoutingKey(),m)) { System.out.println("queued " + m); @@ -143,12 +143,12 @@ class ToyBroker extends SessionDelegate } else { - + reject(ssn); } ssn.processed(xfr); xfr = null; - frames = null; + body = null; } } @@ -165,22 +165,22 @@ class ToyBroker extends SessionDelegate ssn.messageReject(ranges, 0, "no such destination"); } } - + private void transferMessageToPeer(Session ssn,String dest, Message m) { System.out.println("\n==================> Transfering message to: " +dest + "\n"); ssn.messageTransfer(dest, (short)0, (short)0); - ssn.headers(m.headers); - for (Frame f : m.frames) + ssn.header(m.header); + for (Data d : m.body) { - for (ByteBuffer b : f) + for (ByteBuffer b : d.getFragments()) { ssn.data(b); } } ssn.endData(); } - + private void dispatchMessages(Session ssn) { for (String dest: consumers.keySet()) @@ -188,8 +188,8 @@ class ToyBroker extends SessionDelegate checkAndSendMessagesToConsumer(ssn,dest); } } - - private void checkAndSendMessagesToConsumer(Session ssn,String dest) + + private void checkAndSendMessagesToConsumer(Session ssn,String dest) { Consumer c = consumers.get(dest); LinkedBlockingQueue<Message> queue = exchange.getQueue(c._queueName); @@ -204,33 +204,33 @@ class ToyBroker extends SessionDelegate class Message { - private final Struct[] headers; - private final List<Frame> frames; + private final Header header; + private final List<Data> body; - public Message(Struct[] headers, List<Frame> frames) + public Message(Header header, List<Data> body) { - this.headers = headers; - this.frames = frames; + this.header = header; + this.body = body; } public String toString() { StringBuilder sb = new StringBuilder(); - if (headers != null) + if (header != null) { boolean first = true; - for (Struct hdr : headers) + for (Struct st : header.getStructs()) { if (first) { first = false; } else { sb.append(" "); } - sb.append(hdr); + sb.append(st); } } - for (Frame f : frames) + for (Data d : body) { - for (ByteBuffer b : f) + for (ByteBuffer b : d.getFragments()) { sb.append(" | "); sb.append(str(b)); @@ -241,7 +241,7 @@ class ToyBroker extends SessionDelegate } } - + // ugly, but who cares :) // assumes unit is always no of messages, not bytes // assumes it's credit mode and not window @@ -253,7 +253,7 @@ class ToyBroker extends SessionDelegate public static final void main(String[] args) throws IOException { - final ToyExchange exchange = new ToyExchange(); + final ToyExchange exchange = new ToyExchange(); ConnectionDelegate delegate = new ConnectionDelegate() { public SessionDelegate getSessionDelegate() @@ -261,11 +261,11 @@ class ToyBroker extends SessionDelegate return new ToyBroker(exchange); } }; - + //hack delegate.setUsername("guest"); delegate.setPassword("guest"); - + MinaHandler.accept("0.0.0.0", 5672, delegate); } diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java index e325fb93be..d924c55c7b 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java @@ -20,6 +20,9 @@ */ package org.apache.qpidity; +import org.apache.qpidity.transport.*; +import org.apache.qpidity.transport.network.mina.MinaHandler; + /** * ToyClient @@ -42,17 +45,17 @@ class ToyClient extends SessionDelegate } } - public void headers(Session ssn, Struct ... headers) + @Override public void header(Session ssn, Header header) { - for (Struct hdr : headers) + for (Struct st : header.getStructs()) { - System.out.println("header: " + hdr); + System.out.println("header: " + st); } } - public void data(Session ssn, Frame frame) + @Override public void data(Session ssn, Data data) { - System.out.println("got data: " + frame); + System.out.println("got data: " + data); } public static final void main(String[] args) @@ -65,7 +68,7 @@ class ToyClient extends SessionDelegate return new ToyClient(); } }); - conn.getOutputHandler().handle(conn.getHeader().toByteBuffer()); + conn.send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10))); Channel ch = conn.getChannel(0); Session ssn = new Session(); @@ -76,8 +79,8 @@ class ToyClient extends SessionDelegate ssn.sync(); ssn.messageTransfer("asdf", (short) 0, (short) 1); - ssn.headers(new DeliveryProperties(), - new MessageProperties()); + ssn.header(new DeliveryProperties(), + new MessageProperties()); ssn.data("this is the data"); ssn.endData(); @@ -88,6 +91,8 @@ class ToyClient extends SessionDelegate Future<QueueQueryResult> future = ssn.queueQuery("asdf"); System.out.println(future.get().getQueue()); + ssn.close(); + conn.close(); } } diff --git a/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java b/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java deleted file mode 100644 index 28a7d75f05..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * A TrackSwitch sends incoming frames to a different handler based on - * track. - * - * @author Rafael H. Schloming - */ - -class TrackSwitch<C> extends Switch<Byte,Event<C,Frame>> -{ - - public Byte resolve(Event<C,Frame> event) - { - return event.target.getTrack(); - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java b/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java deleted file mode 100644 index fc53b0b9b4..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * A TypeSwitch sends incoming frames to a different handler based on - * type. - * - * @author Rafael H. Schloming - */ - -class TypeSwitch<C> extends Switch<Byte,Event<C,Frame>> -{ - - public Byte resolve(Event<C,Frame> event) - { - return event.target.getType(); - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/api/Message.java b/java/common/src/main/java/org/apache/qpidity/api/Message.java index ab45ee01aa..afb17547ac 100644 --- a/java/common/src/main/java/org/apache/qpidity/api/Message.java +++ b/java/common/src/main/java/org/apache/qpidity/api/Message.java @@ -3,8 +3,8 @@ package org.apache.qpidity.api; import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.qpidity.MessageProperties; -import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.transport.MessageProperties; +import org.apache.qpidity.transport.DeliveryProperties; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -14,9 +14,9 @@ import org.apache.qpidity.DeliveryProperties; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,13 +30,13 @@ public interface Message public MessageProperties getMessageProperties(); public DeliveryProperties getDeliveryProperties(); - + /** * This will abstract the underlying message data. * The Message implementation may not hold all message * data in memory (especially in the case of large messages) - * - * The appendData function might write data to + * + * The appendData function might write data to * <ul> * <li> Memory (Ex: ByteBuffer) * <li> To Disk @@ -50,54 +50,54 @@ public interface Message * This will abstract the underlying message data. * The Message implementation may not hold all message * data in memory (especially in the case of large messages) - * - * The appendData function might write data to + * + * The appendData function might write data to * <ul> * <li> Memory (Ex: ByteBuffer) * <li> To Disk * <li> To Socket (Stream) * </ul> * @param src - the data to append - */ + */ public void appendData(ByteBuffer src) throws IOException; - + /** * This will abstract the underlying message data. * The Message implementation may not hold all message * data in memory (especially in the case of large messages) - * + * * The read function might copy data from * <ul> * <li> From memory (Ex: ByteBuffer) * <li> From Disk * <li> From Socket as and when it gets streamed * </ul> - * @param target The target byte[] which the data gets copied to + * @param target The target byte[] which the data gets copied to */ - public void readData(byte[] target) throws IOException; - + public void readData(byte[] target) throws IOException; + /** * * This will abstract the underlying message data. * The Message implementation may not hold all message * data in memory (especially in the case of large messages) - * + * * The read function might copy data from * <ul> * <li> From memory (Ex: ByteBuffer) * <li> From Disk * <li> From Socket as and when it gets streamed * </ul> - * + * * @return A ByteBuffer containing data * @throws IOException */ - public ByteBuffer readData() throws IOException; - + public ByteBuffer readData() throws IOException; + /** * This should clear the body of the message. */ public void clearData(); - + /** * The provides access to the command Id assigned to the * message transfer. @@ -107,15 +107,14 @@ public interface Message * you could use this id to accquire it. * <li>For releasing a message. You can use this id to release an acquired * message - * <li>For Acknowledging a message - You need to pass this ID, in order to + * <li>For Acknowledging a message - You need to pass this ID, in order to * acknowledge the message * <li>For Rejecting a message - You need to pass this ID, in order to reject - * the message. + * the message. * </ul> - * + * * @return the message transfer id. */ public long getMessageTransferId(); - -} +} diff --git a/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java index 134c9cdf47..cfb9dfbe92 100644 --- a/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java @@ -23,10 +23,10 @@ package org.apache.qpidity.codec; import java.util.Map; import java.util.UUID; -import org.apache.qpidity.RangeSet; -import org.apache.qpidity.Struct; +import org.apache.qpidity.transport.RangeSet; +import org.apache.qpidity.transport.Struct; -import static org.apache.qpidity.Functions.*; +import static org.apache.qpidity.transport.util.Functions.*; /** diff --git a/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java index b6f875bc5d..0cc5a4157a 100644 --- a/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java @@ -25,11 +25,11 @@ import java.nio.ByteBuffer; import java.util.Map; import java.util.UUID; -import org.apache.qpidity.Range; -import org.apache.qpidity.RangeSet; -import org.apache.qpidity.Struct; +import org.apache.qpidity.transport.Range; +import org.apache.qpidity.transport.RangeSet; +import org.apache.qpidity.transport.Struct; -import static org.apache.qpidity.Functions.*; +import static org.apache.qpidity.transport.util.Functions.*; /** diff --git a/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java b/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java index ec91c877f8..0d224056de 100644 --- a/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java +++ b/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java @@ -23,8 +23,8 @@ package org.apache.qpidity.codec; import java.util.Map; import java.util.UUID; -import org.apache.qpidity.RangeSet; -import org.apache.qpidity.Struct; +import org.apache.qpidity.transport.RangeSet; +import org.apache.qpidity.transport.Struct; /** diff --git a/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java b/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java index b879ed5cd7..9a79ec70fa 100644 --- a/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java +++ b/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java @@ -23,8 +23,8 @@ package org.apache.qpidity.codec; import java.util.Map; import java.util.UUID; -import org.apache.qpidity.RangeSet; -import org.apache.qpidity.Struct; +import org.apache.qpidity.transport.RangeSet; +import org.apache.qpidity.transport.Struct; /** diff --git a/java/common/src/main/java/org/apache/qpidity/codec/SegmentEncoder.java b/java/common/src/main/java/org/apache/qpidity/codec/SegmentEncoder.java deleted file mode 100644 index 8fb396f278..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/codec/SegmentEncoder.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.codec; - -import java.nio.BufferOverflowException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -import org.apache.qpidity.Frame; -import org.apache.qpidity.Handler; - -import static java.lang.Math.*; - -import static org.apache.qpidity.Frame.*; - - -/** - * SegmentEncoder - * - * @author Rafael H. Schloming - */ - -public class SegmentEncoder extends AbstractEncoder -{ - - private final Handler<ByteBuffer> handler; - private final int max; - private final byte flags; - private final byte track; - private final byte type; - private final int channel; - - private int remaining; - private ByteBuffer frame; - private boolean first; - - public SegmentEncoder(byte major, byte minor, Handler<ByteBuffer> handler, - int max, byte flags, byte track, byte type, - int channel, int remaining) - { - super(major, minor); - if (max < HEADER_SIZE + 1) - { - throw new IllegalArgumentException - ("max frame size must be large enough to include header"); - } - - this.handler = handler; - this.max = max; - this.flags = flags; - this.track = track; - this.type = type; - this.channel = channel; - this.remaining = remaining; - this.frame = null; - this.first = true; - } - - private void preWrite() { - if (remaining == 0) - { - throw new BufferOverflowException(); - } - - if (frame == null) - { - frame = ByteBuffer.allocate(min(max, remaining + HEADER_SIZE)); - frame.order(ByteOrder.BIG_ENDIAN); - - byte frameFlags = flags; - if (first) { frameFlags |= FIRST_FRAME; first = false; } - if (remaining <= (frame.remaining() - HEADER_SIZE)) - { - frameFlags |= LAST_FRAME; - } - - frame.put(frameFlags); - frame.put(type); - frame.putShort((short) frame.limit()); - frame.put(RESERVED); - frame.put(track); - frame.putShort((short) channel); - frame.put(RESERVED); - frame.put(RESERVED); - frame.put(RESERVED); - frame.put(RESERVED); - - assert frame.position() == HEADER_SIZE; - } - } - - private void postWrite() { - if (!frame.hasRemaining()) - { - frame.flip(); - handler.handle(frame); - frame = null; - } - } - - @Override public void put(byte b) - { - preWrite(); - frame.put(b); - remaining -= 1; - postWrite(); - } - - @Override public void put(ByteBuffer src) - { - if (src.remaining() > remaining) - { - throw new BufferOverflowException(); - } - - while (src.hasRemaining()) - { - preWrite(); - int limit = src.limit(); - src.limit(src.position() + min(frame.remaining(), src.remaining())); - remaining -= src.remaining(); - frame.put(src); - src.limit(limit); - postWrite(); - } - } - - public static final void main(String[] args) { - ByteBuffer buf = ByteBuffer.allocate(1024); - buf.put("AMQP_PROTOCOL_HEADER".getBytes()); - buf.flip(); - - SegmentEncoder enc = new SegmentEncoder((byte) 0, (byte) 10, - new Handler<ByteBuffer>() - { - public void handle(ByteBuffer frame) - { - System.out.println(frame); - } - }, - 16, - (byte) 0x0, - (byte) Frame.L1, - (byte) Frame.METHOD, - 0, - 7 + buf.remaining()); - enc.put((byte)0); - enc.put((byte)1); - enc.put((byte)2); - enc.put((byte)3); - enc.put((byte)4); - enc.put((byte)5); - enc.put((byte)6); - enc.put(buf); - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java b/java/common/src/main/java/org/apache/qpidity/transport/AbstractDelegate.java index c658320989..a8059d669f 100644 --- a/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/AbstractDelegate.java @@ -18,22 +18,23 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport; /** - * ProtocolActions + * AbstractDelegate * - * @author Rafael H. Schloming */ -interface ProtocolActions +class AbstractDelegate<C> { - void init(ProtocolHeader header); + public void init(C context, ProtocolHeader header) {} - void frame(Frame frame); + public void error(C context, ProtocolError error) {} - void error(ProtocolError error); + public void header(C context, Header header) {} + + public void data(C context, Data data) {} } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java new file mode 100644 index 0000000000..426f954c17 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java @@ -0,0 +1,182 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + +import java.nio.ByteBuffer; + +import java.util.List; +import java.util.ArrayList; + +import static org.apache.qpidity.transport.network.Frame.*; +import static org.apache.qpidity.transport.util.Functions.*; + + +/** + * Channel + * + * @author Rafael H. Schloming + */ + +public class Channel extends Invoker implements Receiver<ProtocolEvent> +{ + + final private Connection connection; + final private int channel; + final private Delegate<Channel> delegate; + final private SessionDelegate sessionDelegate; + // session may be null + private Session session; + + private boolean first = true; + private ByteBuffer data = null; + + public Channel(Connection connection, int channel, SessionDelegate delegate) + { + this.connection = connection; + this.channel = channel; + this.delegate = new ChannelDelegate(); + this.sessionDelegate = delegate; + } + + public Connection getConnection() + { + return connection; + } + + public void received(ProtocolEvent event) + { + switch (event.getEncodedTrack()) + { + case L1: + event.delegate(this, connection.getConnectionDelegate()); + break; + case L2: + event.delegate(this, delegate); + break; + case L3: + event.delegate(session, sessionDelegate); + break; + case L4: + // XXX + if (event instanceof Method) + { + Method method = (Method) event; + method.setId(session.nextCommandId()); + method.delegate(session, sessionDelegate); + if (!method.hasPayload()) + { + session.processed(method); + } + } + else + { + event.delegate(session, sessionDelegate); + } + break; + default: + throw new IllegalStateException + ("unknown track: " + event.getEncodedTrack()); + } + } + + public void closed() + { + System.out.println("channel closed: " + this); + } + + public void close() + { + connection.removeChannel(channel); + } + + public int getEncodedChannel() { + return channel; + } + + public Session getSession() + { + return session; + } + + void setSession(Session session) + { + this.session = session; + } + + private void emit(ProtocolEvent event) + { + connection.send(new ConnectionEvent(channel, event)); + } + + public void method(Method m) + { + emit(m); + + if (m.getEncodedTrack() != L4) + { + System.out.println("sent control " + m.getClass().getName()); + } + } + + public void header(Header header) + { + emit(header); + } + + public void data(ByteBuffer buf) + { + if (data != null) + { + emit(new Data(data, first, false)); + first = false; + } + + data = buf; + } + + public void data(String str) + { + data(str.getBytes()); + } + + public void data(byte[] bytes) + { + data(ByteBuffer.wrap(bytes)); + } + + public void end() + { + emit(new Data(data, first, true)); + first = true; + data = null; + } + + protected void invoke(Method m) + { + method(m); + } + + protected <T> Future<T> invoke(Method m, Class<T> cls) + { + throw new UnsupportedOperationException(); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java index d486868621..9d28b1a81e 100644 --- a/java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport; import java.util.UUID; diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Connection.java b/java/common/src/main/java/org/apache/qpidity/transport/Connection.java new file mode 100644 index 0000000000..2c68b7c38b --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/transport/Connection.java @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + +import java.util.HashMap; +import java.util.Map; + +import java.nio.ByteBuffer; + + +/** + * Connection + * + * @author Rafael H. Schloming + * + * @todo the channels map should probably be replaced with something + * more efficient, e.g. an array or a map implementation that can use + * short instead of Short + */ + +// RA making this public until we sort out the package issues +public class Connection + implements Receiver<ConnectionEvent>, Sender<ConnectionEvent> +{ + + final private Sender<ConnectionEvent> sender; + final private ConnectionDelegate delegate; + + final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>(); + + public Connection(Sender<ConnectionEvent> sender, + ConnectionDelegate delegate) + { + this.sender = sender; + this.delegate = delegate; + } + + public ConnectionDelegate getConnectionDelegate() + { + return delegate; + } + + public void received(ConnectionEvent event) + { + Channel channel = getChannel(event.getChannel()); + channel.received(event.getProtocolEvent()); + } + + public void send(ConnectionEvent event) + { + sender.send(event); + } + + public Channel getChannel(int number) + { + synchronized (channels) + { + Channel channel = channels.get(number); + if (channel == null) + { + channel = new Channel(this, number, delegate.getSessionDelegate()); + channels.put(number, channel); + } + return channel; + } + } + + void removeChannel(int number) + { + synchronized (channels) + { + channels.remove(number); + } + } + + public void closed() + { + System.out.println("connection closed: " + this); + } + + public void close() + { + sender.close(); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java index ff89567cee..d500cc6b81 100644 --- a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java @@ -18,7 +18,10 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport; + +import org.apache.qpidity.SecurityHelper; +import org.apache.qpidity.QpidException; import java.io.UnsupportedEncodingException; import java.util.HashMap; @@ -41,7 +44,7 @@ import javax.security.sasl.SaslServer; /** * Currently only implemented client specific methods * the server specific methods are dummy impls for testing - * + * * the connectionClose is kind of different for both sides */ public abstract class ConnectionDelegate extends Delegate<Channel> @@ -56,23 +59,47 @@ public abstract class ConnectionDelegate extends Delegate<Channel> private int maxFrame = 64*1024; private Condition _negotiationComplete; private Lock _negotiationCompleteLock; - + public abstract SessionDelegate getSessionDelegate(); - + public void setCondition(Lock negotiationCompleteLock,Condition negotiationComplete) { _negotiationComplete = negotiationComplete; _negotiationCompleteLock = negotiationCompleteLock; } - + + @Override public void init(Channel ch, ProtocolHeader hdr) + { + System.out.println(hdr); + // XXX: hardcoded version + if (hdr.getMajor() != 0 && hdr.getMinor() != 10) + { + // XXX + ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10))); + ch.getConnection().close(); + } + else + { + + System.out.println("\n--------------------Broker Start Connection Negotiation -----------------------\n"); + + ch.connectionStart(hdr.getMajor(), hdr.getMinor(), null, "PLAIN", "utf8"); + } + } + + @Override public void error(Channel ch, ProtocolError error) + { + throw new RuntimeException(error.getMessage()); + } + // ---------------------------------------------- - // Client side + // Client side //----------------------------------------------- - @Override public void connectionStart(Channel context, ConnectionStart struct) + @Override public void connectionStart(Channel context, ConnectionStart struct) { System.out.println("\n--------------------Client Start Connection Negotiation -----------------------\n"); System.out.println("The broker has sent connection-start"); - + String mechanism = null; String response = null; try @@ -94,15 +121,15 @@ public abstract class ConnectionDelegate extends Delegate<Channel> { // need error handling } - - Map<String,?> props = new HashMap<String,String>(); + + Map<String,?> props = new HashMap<String,String>(); context.connectionStartOk(props, mechanism, response, _locale); } - - @Override public void connectionSecure(Channel context, ConnectionSecure struct) + + @Override public void connectionSecure(Channel context, ConnectionSecure struct) { System.out.println("The broker has sent connection-secure with chanllenge " + struct.getChallenge()); - + try { String response = new String(saslClient.evaluateChallenge(struct.getChallenge().getBytes()),_locale); @@ -115,20 +142,20 @@ public abstract class ConnectionDelegate extends Delegate<Channel> catch (SaslException e) { // need error handling - } + } } - - @Override public void connectionTune(Channel context, ConnectionTune struct) + + @Override public void connectionTune(Channel context, ConnectionTune struct) { System.out.println("The broker has sent connection-tune " + struct.toString()); - + // should update the channel max given by the broker. - context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat()); + context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat()); context.connectionOpen(_virtualHost, null, Option.INSIST); } - - - @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct) + + + @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct) { String knownHosts = struct.getKnownHosts(); System.out.println("The broker has opened the connection for use"); @@ -147,23 +174,23 @@ public abstract class ConnectionDelegate extends Delegate<Channel> } System.out.println("\n-------------------- Client End Connection Negotiation -----------------------\n"); } - - public void connectionRedirect(Channel context, ConnectionRedirect struct) + + public void connectionRedirect(Channel context, ConnectionRedirect struct) { // not going to bother at the moment } - + // ---------------------------------------------- - // Server side + // Server side //----------------------------------------------- - @Override public void connectionStartOk(Channel context, ConnectionStartOk struct) + @Override public void connectionStartOk(Channel context, ConnectionStartOk struct) { //set the client side locale on the server side _locale = struct.getLocale(); _mechanism = struct.getMechanism(); - + System.out.println("The client has sent connection-start-ok"); - + //try //{ //saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",null,SecurityHelper.createCallbackHandler(_mechanism,_username,_password)); @@ -183,11 +210,11 @@ public abstract class ConnectionDelegate extends Delegate<Channel> } catch(Exception e) { - + } } - - + + /*} catch (SaslException e) { @@ -198,13 +225,13 @@ public abstract class ConnectionDelegate extends Delegate<Channel> // need error handling }*/ } - - @Override public void connectionTuneOk(Channel context, ConnectionTuneOk struct) + + @Override public void connectionTuneOk(Channel context, ConnectionTuneOk struct) { System.out.println("The client has excepted the tune params"); } - - @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct) + + @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct) { System.out.println("The client has sent connection-secure-ok"); try @@ -225,11 +252,11 @@ public abstract class ConnectionDelegate extends Delegate<Channel> } catch(Exception e) { - + } } - - + + } catch (SaslException e) { @@ -240,17 +267,16 @@ public abstract class ConnectionDelegate extends Delegate<Channel> // need error handling } } - - - @Override public void connectionOpen(Channel context, ConnectionOpen struct) + + + @Override public void connectionOpen(Channel context, ConnectionOpen struct) { String hosts = "amqp:1223243232325"; System.out.println("The client has sent connection-open"); context.connectionOpenOk(hosts); System.out.println("\n-------------------- Broker End Connection Negotiation -----------------------\n"); } - - + public String getPassword() { return _password; diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolError.java b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java index a4a83fad35..6ed0df57a7 100644 --- a/java/common/src/main/java/org/apache/qpidity/ProtocolError.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java @@ -18,30 +18,34 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport; /** - * ProtocolError + * ConnectionEvent * - * @author Rafael H. Schloming */ -class ProtocolError +public class ConnectionEvent { - private final String format; - private final Object[] args; + private final int channel; + private final ProtocolEvent event; - public ProtocolError(String format, Object ... args) + public ConnectionEvent(int channel, ProtocolEvent event) { - this.format = format; - this.args = args; + this.channel = channel; + this.event = event; } - public String getMessage() + public int getChannel() { - return String.format(format, args); + return channel; + } + + public ProtocolEvent getProtocolEvent() + { + return event; } } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Data.java b/java/common/src/main/java/org/apache/qpidity/transport/Data.java new file mode 100644 index 0000000000..55cde84d5e --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/transport/Data.java @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + +import org.apache.qpidity.transport.network.Frame; + +import java.nio.ByteBuffer; + +import java.util.Collections; + + +/** + * Data + * + */ + +public class Data implements ProtocolEvent +{ + + private final Iterable<ByteBuffer> fragments; + private final boolean first; + private final boolean last; + + public Data(Iterable<ByteBuffer> fragments, boolean first, boolean last) + { + this.fragments = fragments; + this.first = first; + this.last = last; + } + + public Data(ByteBuffer buf, boolean first, boolean last) + { + this(Collections.singletonList(buf), first, last); + } + + public Iterable<ByteBuffer> getFragments() + { + return fragments; + } + + public boolean isFirst() + { + return first; + } + + public boolean isLast() + { + return last; + } + + public byte getEncodedTrack() + { + return Frame.L4; + } + + public <C> void delegate(C context, Delegate<C> delegate) + { + delegate.data(context, this); + } + + public <C> void delegate(C context, Switch sw) + { + sw.data(context, this); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/Future.java b/java/common/src/main/java/org/apache/qpidity/transport/Future.java index 8902446e95..8936f06831 100644 --- a/java/common/src/main/java/org/apache/qpidity/Future.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Future.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport; /** diff --git a/java/common/src/main/java/org/apache/qpidity/Switch.java b/java/common/src/main/java/org/apache/qpidity/transport/Header.java index 166dc33134..632dc137c1 100644 --- a/java/common/src/main/java/org/apache/qpidity/Switch.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Header.java @@ -18,41 +18,59 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport; + +import org.apache.qpidity.transport.network.Frame; + +import java.util.List; -import java.util.HashMap; -import java.util.Map; /** - * A Switch performs generic event dispatch. + * Header * * @author Rafael H. Schloming */ -abstract class Switch<K,E> implements Handler<E> -{ +public class Header implements ProtocolEvent { - final private Map<K,Handler<E>> handlers = - new HashMap<K,Handler<E>>(); + private final List<Struct> structs; - public void map(K key, Handler<E> handler) + public Header(List<Struct> structs) { - handlers.put(key, handler); + this.structs = structs; } - public void handle(E event) + public List<Struct> getStructs() { - K key = resolve(event); - Handler<E> handler = handlers.get(key); - if (handler == null) + return structs; + } + + public <T> T get(Class<T> klass) + { + for (Struct st : structs) { - throw new IllegalStateException("no such key: " + key + - " this = " + this + - " handlers = " + handlers); + if (klass.isInstance(st)) + { + return klass.cast(st); + } } - handler.handle(event); + + return null; + } + + public byte getEncodedTrack() + { + return Frame.L4; } - abstract K resolve(E event); + public <C> void delegate(C context, Delegate<C> delegate) + { + delegate.header(context, this); + } + + public <C> void delegate(C context, Switch sw) + { + sw.header(context, this); + } } diff --git a/java/common/src/main/java/org/apache/qpidity/Method.java b/java/common/src/main/java/org/apache/qpidity/transport/Method.java index 43865d36aa..edd9116a73 100644 --- a/java/common/src/main/java/org/apache/qpidity/Method.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Method.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport; /** @@ -27,7 +27,7 @@ package org.apache.qpidity; * @author Rafael H. Schloming */ -public abstract class Method extends Struct +public abstract class Method extends Struct implements ProtocolEvent { public static final Method create(int type) @@ -54,11 +54,9 @@ public abstract class Method extends Struct public abstract byte getEncodedTrack(); - // XXX: do we need a segment base type? - public byte getSegmentType() + public <C> void delegate(C context, Switch sw) { - // XXX - return Frame.METHOD; + sw.method(context, this); } } diff --git a/java/common/src/main/java/org/apache/qpidity/Segment.java b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java index ee9f60fad8..cd9fb3b94a 100644 --- a/java/common/src/main/java/org/apache/qpidity/Segment.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java @@ -18,57 +18,55 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; - -import java.nio.ByteBuffer; - -import static org.apache.qpidity.Functions.*; +import org.apache.qpidity.transport.network.NetworkDelegate; +import org.apache.qpidity.transport.network.NetworkEvent; /** - * Segment + * ProtocolError * * @author Rafael H. Schloming */ -class Segment implements Iterable<ByteBuffer> +public class ProtocolError implements NetworkEvent, ProtocolEvent { - private final Collection<ByteBuffer> fragments = new ArrayList<ByteBuffer>(); + private final byte track; + private final String format; + private final Object[] args; - public void add(ByteBuffer fragment) + public ProtocolError(byte track, String format, Object ... args) { - fragments.add(fragment); + this.track = track; + this.format = format; + this.args = args; } - public Iterator<ByteBuffer> getFragments() + public byte getEncodedTrack() { - return new SliceIterator(fragments.iterator()); + return track; } - public Iterator<ByteBuffer> iterator() + public String getMessage() { - return getFragments(); + return String.format(format, args); } - public String toString() + public <C> void delegate(C context, Switch sw) { - StringBuilder str = new StringBuilder(); - String sep = " | "; - - for (ByteBuffer buf : this) - { - str.append(str(buf)); - str.append(sep); - } + sw.error(context, this); + } - str.setLength(str.length() - sep.length()); + public void delegate(NetworkDelegate delegate) + { + delegate.error(this); + } - return str.toString(); + public <C> void delegate(C context, Delegate<C> delegate) + { + delegate.error(context, this); } } diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java index f5a040166e..e2adefba9e 100644 --- a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java @@ -18,30 +18,32 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport; /** - * A MethodDispatcher parses and dispatches a method segment. + * ProtocolEvent * - * @author Rafael H. Schloming */ -class MethodDispatcher<C> implements Handler<Event<C,Method>> +public interface ProtocolEvent { - final private Delegate<C> delegate; - - public MethodDispatcher(Delegate<C> delegate) + public interface Switch<C> { - this.delegate = delegate; + void init(C context, ProtocolHeader header); + void method(C context, Method method); + void header(C context, Header header); + void data(C context, Data data); + void error(C context, ProtocolError error); } - public void handle(Event<C,Method> event) - { - Method method = event.target; - System.out.println("\nDelegating " + method.getClass().getName() + " to " + delegate.getClass().getName() + "\n"); - method.delegate(event.context, delegate); - } + // XXX: could do this switching with cascading defaults for the + // specific dispatch methods + <C> void delegate(C context, Switch sw); + + <C> void delegate(C context, Delegate<C> delegate); + + byte getEncodedTrack(); } diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java index 140d5ecbe3..50cae51171 100644 --- a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java @@ -18,10 +18,14 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport; import java.nio.ByteBuffer; +import org.apache.qpidity.transport.network.NetworkDelegate; +import org.apache.qpidity.transport.network.NetworkEvent; +import org.apache.qpidity.transport.network.Frame; + /** * ProtocolHeader @@ -31,7 +35,7 @@ import java.nio.ByteBuffer; //RA making this public until we sort out the package issues -public class ProtocolHeader +public class ProtocolHeader implements NetworkEvent, ProtocolEvent { private static final byte[] AMQP = {'A', 'M', 'Q', 'P' }; @@ -48,6 +52,11 @@ public class ProtocolHeader this.minor = minor; } + public ProtocolHeader(int instance, int major, int minor) + { + this((byte) instance, (byte) major, (byte) minor); + } + public byte getInstance() { return instance; @@ -63,6 +72,11 @@ public class ProtocolHeader return minor; } + public byte getEncodedTrack() + { + return Frame.L1; + } + public ByteBuffer toByteBuffer() { ByteBuffer buf = ByteBuffer.allocate(8); @@ -75,6 +89,21 @@ public class ProtocolHeader return buf; } + public <C> void delegate(C context, Switch sw) + { + sw.init(context, this); + } + + public void delegate(NetworkDelegate delegate) + { + delegate.init(this); + } + + public <C> void delegate(C context, Delegate<C> delegate) + { + delegate.init(context, this); + } + public String toString() { return String.format("AMQP.%d %d-%d", instance, major, minor); diff --git a/java/common/src/main/java/org/apache/qpidity/Range.java b/java/common/src/main/java/org/apache/qpidity/transport/Range.java index 9da7112a6d..ed745bf5ec 100644 --- a/java/common/src/main/java/org/apache/qpidity/Range.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Range.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport; import static java.lang.Math.*; diff --git a/java/common/src/main/java/org/apache/qpidity/RangeSet.java b/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java index 2e79ee8a72..dfaec3702c 100644 --- a/java/common/src/main/java/org/apache/qpidity/RangeSet.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport; import java.util.Collection; import java.util.Iterator; diff --git a/java/common/src/main/java/org/apache/qpidity/Delegator.java b/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java index 00e769e0f2..65edb3a6ec 100644 --- a/java/common/src/main/java/org/apache/qpidity/Delegator.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java @@ -18,18 +18,19 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport; /** - * Delegator + * Receiver * - * @author Rafael H. Schloming */ -interface Delegator +public interface Receiver<T> { - <C> void delegate(C context, Delegate<C> delegate); + void received(T msg); + + void closed(); } diff --git a/java/common/src/main/java/org/apache/qpidity/Result.java b/java/common/src/main/java/org/apache/qpidity/transport/Result.java index 7fe6c869a4..2126a76a53 100644 --- a/java/common/src/main/java/org/apache/qpidity/Result.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Result.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport; /** diff --git a/java/common/src/main/java/org/apache/qpidity/Header.java b/java/common/src/main/java/org/apache/qpidity/transport/Sender.java index 9b6373df19..6da8358bd6 100644 --- a/java/common/src/main/java/org/apache/qpidity/Header.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Sender.java @@ -18,13 +18,19 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport; /** - * Header + * Sender * - * @author Rafael H. Schloming */ -public abstract class Header extends Struct {} +public interface Sender<T> +{ + + void send(T msg); + + void close(); + +} diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java index 6f0bd5c757..59e8daae31 100644 --- a/java/common/src/main/java/org/apache/qpidity/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -18,10 +18,14 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport; + +import org.apache.qpidity.transport.network.Frame; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; @@ -42,12 +46,12 @@ public class Session extends Invoker // completed incoming commands private final RangeSet processed = new RangeSet(); private Range syncPoint = null; - + // outgoing command count private long commandsOut = 0; private Map<Long,Method> commands = new HashMap<Long,Method>(); private long mark = 0; - + public Map<Long,Method> getOutstandingCommands() { @@ -104,19 +108,19 @@ public class Session extends Invoker } void flushProcessed() - { + { for (Range r: processed) { - System.out.println("Completed Range [" + r.getLower() + "," + r.getUpper() +"]" ); + System.out.println("Completed Range [" + r.getLower() + "," + r.getUpper() +"]" ); } - System.out.println("Notifying peer with execution complete"); + System.out.println("Notifying peer with execution complete"); executionComplete(0, processed); } void syncPoint() { System.out.println("===========Request received to sync=========================="); - + Range range = new Range(0, getCommandsIn() - 1); boolean flush; synchronized (processed) @@ -154,8 +158,8 @@ public class Session extends Invoker for (long id = lower; id <= upper; id++) { commands.remove(id); - } - + } + if (commands.isEmpty()) { System.out.println("\n All outstanding commands are completed !!!! \n"); @@ -177,15 +181,25 @@ public class Session extends Invoker synchronized (commands) { System.out.println("sent command " + m.getClass().getName() + " command Id" + commandsOut); - commands.put(commandsOut++, m); + commands.put(commandsOut++, m); } } channel.method(m); } - public void headers(Struct ... headers) + public void header(Header header) + { + channel.header(header); + } + + public void header(List<Struct> structs) + { + header(new Header(structs)); + } + + public void header(Struct ... structs) { - channel.headers(headers); + header(Arrays.asList(structs)); } public void data(ByteBuffer buf) @@ -210,7 +224,7 @@ public class Session extends Invoker public void sync() { - System.out.println("calling sync()"); + System.out.println("calling sync()"); synchronized (commands) { if (!commands.isEmpty()) @@ -221,7 +235,7 @@ public class Session extends Invoker while (!commands.isEmpty()) { try { - System.out.println("\n============sync() waiting for commmands to be completed ==============\n"); + System.out.println("\n============sync() waiting for commmands to be completed ==============\n"); commands.wait(); System.out.println("\n============sync() got notified=========================================\n"); } @@ -314,4 +328,10 @@ public class Session extends Invoker } + public void close() + { + sessionClose(); + channel.close(); + } + } diff --git a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java index e6c107ced2..8b3c661075 100644 --- a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport; /** @@ -30,9 +30,7 @@ package org.apache.qpidity; public abstract class SessionDelegate extends Delegate<Session> { - public abstract void headers(Session ssn, Struct ... headers); - - public abstract void data(Session ssn, Frame frame); + private static final Struct[] EMPTY_STRUCT_ARRAY = {}; @Override public void executionResult(Session ssn, ExecutionResult result) { @@ -47,7 +45,7 @@ public abstract class SessionDelegate extends Delegate<Session> for (Range range : ranges) { System.out.println("completed command range: " + range.getLower() + " to " + range.getUpper()); - ssn.complete(range.getLower(), range.getUpper()); + ssn.complete(range.getLower(), range.getUpper()); } } ssn.complete(excmp.getCumulativeExecutionMark()); diff --git a/java/common/src/main/java/org/apache/qpidity/Struct.java b/java/common/src/main/java/org/apache/qpidity/transport/Struct.java index 50da4910ab..f6464780e7 100644 --- a/java/common/src/main/java/org/apache/qpidity/Struct.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Struct.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport; import org.apache.qpidity.codec.Encodable; @@ -29,7 +29,7 @@ import org.apache.qpidity.codec.Encodable; * @author Rafael H. Schloming */ -public abstract class Struct implements Delegator, Encodable +public abstract class Struct implements Encodable { public static Struct create(int type) diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java new file mode 100644 index 0000000000..e17b37bb36 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java @@ -0,0 +1,184 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport.network; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import java.nio.ByteBuffer; + +import org.apache.qpidity.codec.FragmentDecoder; + +import org.apache.qpidity.transport.ConnectionEvent; +import org.apache.qpidity.transport.Data; +import org.apache.qpidity.transport.Header; +import org.apache.qpidity.transport.Method; +import org.apache.qpidity.transport.ProtocolError; +import org.apache.qpidity.transport.ProtocolEvent; +import org.apache.qpidity.transport.ProtocolHeader; +import org.apache.qpidity.transport.Receiver; +import org.apache.qpidity.transport.Struct; + + +/** + * Assembler + * + */ + +public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate +{ + + private final Receiver<ConnectionEvent> receiver; + private final byte major; + private final byte minor; + private final Map<Integer,List<ByteBuffer>> segments; + + public Assembler(Receiver<ConnectionEvent> receiver, byte major, byte minor) + { + this.receiver = receiver; + this.major = major; + this.minor = minor; + segments = new HashMap<Integer,List<ByteBuffer>>(); + } + + private int segmentKey(Frame frame) + { + // XXX: can this overflow? + return (frame.getTrack() + 1) * frame.getChannel(); + } + + private List<ByteBuffer> getSegment(Frame frame) + { + return segments.get(segmentKey(frame)); + } + + private void setSegment(Frame frame, List<ByteBuffer> segment) + { + int key = segmentKey(frame); + if (segments.containsKey(key)) + { + error(new ProtocolError(Frame.L2, "segment in progress: %s", + frame)); + } + segments.put(segmentKey(frame), segment); + } + + private void clearSegment(Frame frame) + { + segments.remove(segmentKey(frame)); + } + + private void emit(int channel, ProtocolEvent event) + { + receiver.received(new ConnectionEvent(channel, event)); + } + + private void emit(Frame frame, ProtocolEvent event) + { + emit(frame.getChannel(), event); + } + + public void received(NetworkEvent event) + { + event.delegate(this); + } + + public void closed() + { + this.receiver.closed(); + } + + public void init(ProtocolHeader header) + { + emit(0, header); + } + + public void frame(Frame frame) + { + switch (frame.getType()) + { + case Frame.BODY: + emit(frame, new Data(frame, frame.isFirstFrame(), + frame.isLastFrame())); + break; + default: + assemble(frame); + break; + } + } + + public void error(ProtocolError error) + { + emit(0, error); + } + + private void assemble(Frame frame) + { + List<ByteBuffer> segment; + if (frame.isFirstFrame()) + { + segment = new ArrayList<ByteBuffer>(); + setSegment(frame, segment); + } + else + { + segment = getSegment(frame); + } + + for (ByteBuffer buf : frame) + { + segment.add(buf); + } + + if (frame.isLastFrame()) + { + clearSegment(frame); + emit(frame, decode(frame.getType(), segment)); + } + } + + private ProtocolEvent decode(byte type, List<ByteBuffer> segment) + { + FragmentDecoder dec = + new FragmentDecoder(major, minor, segment.iterator()); + + switch (type) + { + case Frame.METHOD: + int methodType = (int) dec.readLong(); + Method method = Method.create(methodType); + method.read(dec, major, minor); + return method; + case Frame.HEADER: + List<Struct> structs = new ArrayList(); + while (dec.hasRemaining()) + { + structs.add(dec.readLongStruct()); + } + return new Header(structs); + default: + throw new IllegalStateException("unknown frame type: " + type); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java new file mode 100644 index 0000000000..73be9c3492 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java @@ -0,0 +1,168 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport.network; + +import org.apache.qpidity.codec.BBEncoder; +import org.apache.qpidity.codec.SizeEncoder; + +import org.apache.qpidity.transport.ConnectionEvent; +import org.apache.qpidity.transport.Data; +import org.apache.qpidity.transport.Header; +import org.apache.qpidity.transport.Method; +import org.apache.qpidity.transport.ProtocolHeader; +import org.apache.qpidity.transport.ProtocolError; +import org.apache.qpidity.transport.ProtocolEvent; +import org.apache.qpidity.transport.Sender; +import org.apache.qpidity.transport.Struct; + +import java.nio.ByteBuffer; + +import static org.apache.qpidity.transport.network.Frame.*; + +import static java.lang.Math.*; + + +/** + * Disassembler + * + */ + +public class Disassembler + implements Sender<ConnectionEvent>, ProtocolEvent.Switch<ConnectionEvent> +{ + + private final Sender<NetworkEvent> sender; + private final int maxFrame; + private final byte major; + private final byte minor; + + public Disassembler(Sender<NetworkEvent> sender, byte major, byte minor, + int maxFrame) + { + this.sender = sender; + this.major = major; + this.minor = minor; + this.maxFrame = maxFrame; + } + + public void send(ConnectionEvent event) + { + event.getProtocolEvent().delegate(event, this); + } + + public void close() + { + sender.close(); + } + + private void fragment(byte flags, byte type, ConnectionEvent event, + ByteBuffer buf, boolean first, boolean last) + { + while (buf.hasRemaining()) + { + ByteBuffer slice = buf.slice(); + slice.limit(min(maxFrame, slice.remaining())); + buf.position(buf.position() + slice.remaining()); + + byte newflags = flags; + if (first) + { + newflags |= FIRST_FRAME; + first = false; + } + if (last && !buf.hasRemaining()) + { + newflags |= LAST_FRAME; + } + + Frame frame = new Frame(newflags, type, + event.getProtocolEvent().getEncodedTrack(), + event.getChannel()); + frame.addFragment(slice); + sender.send(frame); + } + } + + public void init(ConnectionEvent event, ProtocolHeader header) + { + sender.send(header); + } + + public void method(ConnectionEvent event, Method method) + { + SizeEncoder sizer = new SizeEncoder(major, minor); + sizer.writeLong(method.getEncodedType()); + method.write(sizer, major, minor); + sizer.flush(); + int size = sizer.getSize(); + + ByteBuffer buf = ByteBuffer.allocate(size); + BBEncoder enc = new BBEncoder(major, minor, buf); + enc.writeLong(method.getEncodedType()); + method.write(enc, major, minor); + enc.flush(); + buf.flip(); + + byte flags = FIRST_SEG; + + if (!method.hasPayload()) + { + flags |= LAST_SEG; + } + + fragment(flags, METHOD, event, buf, true, true); + } + + public void header(ConnectionEvent event, Header header) + { + SizeEncoder sizer = new SizeEncoder(major, minor); + for (Struct st : header.getStructs()) + { + sizer.writeLongStruct(st); + } + + ByteBuffer buf = ByteBuffer.allocate(sizer.getSize()); + BBEncoder enc = new BBEncoder(major, minor, buf); + for (Struct st : header.getStructs()) + { + enc.writeLongStruct(st); + enc.flush(); + } + buf.flip(); + + fragment((byte) 0x0, HEADER, event, buf, true, true); + } + + public void data(ConnectionEvent event, Data data) + { + for (ByteBuffer buf : data.getFragments()) + { + fragment(LAST_SEG, BODY, event, buf, data.isFirst(), + data.isLast()); + } + } + + public void error(ConnectionEvent event, ProtocolError error) + { + sender.send(error); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/Frame.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java index 89e7579cb3..c36b03b104 100644 --- a/java/common/src/main/java/org/apache/qpidity/Frame.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java @@ -18,7 +18,9 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport.network; + +import org.apache.qpidity.transport.util.SliceIterator; import java.nio.ByteBuffer; @@ -26,7 +28,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Iterator; -import static org.apache.qpidity.Functions.*; +import static org.apache.qpidity.transport.util.Functions.*; /** @@ -36,7 +38,7 @@ import static org.apache.qpidity.Functions.*; */ // RA: changed it to public until we sort the package issues -public class Frame implements Iterable<ByteBuffer> +public class Frame implements NetworkEvent, Iterable<ByteBuffer> { public static final int HEADER_SIZE = 12; @@ -82,6 +84,11 @@ public class Frame implements Iterable<ByteBuffer> size += fragment.remaining(); } + public byte getFlags() + { + return flags; + } + public int getChannel() { return channel; @@ -137,6 +144,11 @@ public class Frame implements Iterable<ByteBuffer> return getFragments(); } + public void delegate(NetworkDelegate delegate) + { + delegate.frame(this); + } + public String toString() { StringBuilder str = new StringBuilder(); diff --git a/java/common/src/main/java/org/apache/qpidity/InputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java index 78cb3d1b60..191f900c02 100644 --- a/java/common/src/main/java/org/apache/qpidity/InputHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java @@ -18,11 +18,15 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport.network; import java.nio.ByteBuffer; -import static org.apache.qpidity.InputHandler.State.*; +import org.apache.qpidity.transport.ProtocolError; +import org.apache.qpidity.transport.ProtocolHeader; +import org.apache.qpidity.transport.Receiver; + +import static org.apache.qpidity.transport.network.InputHandler.State.*; /** @@ -31,7 +35,7 @@ import static org.apache.qpidity.InputHandler.State.*; * @author Rafael H. Schloming */ -class InputHandler implements Handler<ByteBuffer> +public class InputHandler implements Receiver<ByteBuffer> { public enum State @@ -61,7 +65,7 @@ class InputHandler implements Handler<ByteBuffer> ERROR; } - private final ProtocolActions actions; + private final Receiver<NetworkEvent> receiver; private State state; private byte instance; @@ -75,35 +79,35 @@ class InputHandler implements Handler<ByteBuffer> private int size; private Frame frame; - public InputHandler(ProtocolActions actions, State state) + public InputHandler(Receiver<NetworkEvent> receiver, State state) { - this.actions = actions; + this.receiver = receiver; this.state = state; } - public InputHandler(ProtocolActions actions) + public InputHandler(Receiver<NetworkEvent> receiver) { - this(actions, PROTO_HDR); + this(receiver, PROTO_HDR); } private void init() { - actions.init(new ProtocolHeader(instance, major, minor)); + receiver.received(new ProtocolHeader(instance, major, minor)); } private void frame() { assert size == frame.getSize(); - actions.frame(frame); + receiver.received(frame); frame = null; } private void error(String fmt, Object ... args) { - actions.error(new ProtocolError(fmt, args)); + receiver.received(new ProtocolError(Frame.L1, fmt, args)); } - public void handle(ByteBuffer buf) + public void received(ByteBuffer buf) { while (buf.hasRemaining()) { @@ -227,4 +231,9 @@ class InputHandler implements Handler<ByteBuffer> } } + public void closed() + { + receiver.closed(); + } + } diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java index 526ef50211..48655edd0c 100644 --- a/java/common/src/main/java/org/apache/qpidity/ProtocolHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java @@ -18,16 +18,19 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport.network; + +import org.apache.qpidity.transport.ProtocolError; +import org.apache.qpidity.transport.ProtocolHeader; /** - * ProtocolHandler + * NetworkDelegate * * @author Rafael H. Schloming */ -interface ProtocolHandler +public interface NetworkDelegate { void init(ProtocolHeader header); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java b/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java new file mode 100644 index 0000000000..080efee704 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport.network; + + +/** + * NetworkEvent + * + */ + +public interface NetworkEvent +{ + + void delegate(NetworkDelegate delegate); + +} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java new file mode 100644 index 0000000000..90bef36790 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport.network; + +import java.nio.ByteBuffer; + +import org.apache.qpidity.transport.ProtocolError; +import org.apache.qpidity.transport.ProtocolHeader; +import org.apache.qpidity.transport.Sender; + +import static org.apache.qpidity.transport.network.Frame.*; + + +/** + * OutputHandler + * + */ + +public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate +{ + + private Sender<ByteBuffer> sender; + private Object lock = new Object(); + + public OutputHandler(Sender<ByteBuffer> sender) + { + this.sender = sender; + } + + public void send(NetworkEvent event) + { + event.delegate(this); + } + + public void close() + { + synchronized (lock) + { + sender.close(); + } + } + + public void init(ProtocolHeader header) + { + synchronized (lock) + { + sender.send(header.toByteBuffer()); + } + } + + public void frame(Frame frame) + { + ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE); + hdr.put(frame.getFlags()); + hdr.put(frame.getType()); + hdr.putShort((short) (frame.getSize() + HEADER_SIZE)); + hdr.put(RESERVED); + hdr.put(frame.getTrack()); + hdr.putShort((short) frame.getChannel()); + hdr.put(RESERVED); + hdr.put(RESERVED); + hdr.put(RESERVED); + hdr.put(RESERVED); + hdr.flip(); + + synchronized (lock) + { + sender.send(hdr); + for (ByteBuffer buf : frame) + { + sender.send(buf); + } + } + } + + public void error(ProtocolError error) + { + throw new IllegalStateException("XXX"); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/MinaHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java index f255b56d0b..ac9dab615d 100644 --- a/java/common/src/main/java/org/apache/qpidity/MinaHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport.network.mina; import java.io.IOException; import java.net.InetSocketAddress; @@ -28,7 +28,6 @@ import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoAcceptor; -import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandler; import org.apache.mina.common.IoSession; import org.apache.mina.common.SimpleByteBufferAllocator; @@ -36,6 +35,16 @@ import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.transport.socket.nio.SocketAcceptor; import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.qpidity.transport.Connection; +import org.apache.qpidity.transport.ConnectionDelegate; +import org.apache.qpidity.transport.Receiver; +import org.apache.qpidity.transport.Sender; + +import org.apache.qpidity.transport.network.Assembler; +import org.apache.qpidity.transport.network.Disassembler; +import org.apache.qpidity.transport.network.InputHandler; +import org.apache.qpidity.transport.network.OutputHandler; + /** * MinaHandler @@ -57,9 +66,9 @@ public class MinaHandler implements IoHandler public void messageReceived(IoSession ssn, Object obj) { - Connection conn = (Connection) ssn.getAttachment(); + Attachment attachment = (Attachment) ssn.getAttachment(); ByteBuffer buf = (ByteBuffer) obj; - conn.getInputHandler().handle(buf.buf()); + attachment.receiver.received(buf.buf()); } public void messageSent(IoSession ssn, Object obj) @@ -80,16 +89,15 @@ public class MinaHandler implements IoHandler public void sessionOpened(final IoSession ssn) { System.out.println("opened " + ssn); - Connection conn = new Connection(new Handler<java.nio.ByteBuffer>() - { - public void handle(java.nio.ByteBuffer buf) - { - ssn.write(ByteBuffer.wrap(buf)); - } - }, - delegate, - state); - ssn.setAttachment(conn); + // XXX: hardcoded version + max-frame + Connection conn = new Connection + (new Disassembler(new OutputHandler(new MinaSender(ssn)), + (byte)0, (byte)10, 64*1024), + delegate); + // XXX: hardcoded version + Receiver<java.nio.ByteBuffer> receiver = + new InputHandler(new Assembler(conn, (byte)0, (byte)10), state); + ssn.setAttachment(new Attachment(conn, receiver)); // XXX synchronized (ssn) { @@ -100,21 +108,27 @@ public class MinaHandler implements IoHandler public void sessionClosed(IoSession ssn) { System.out.println("closed " + ssn); + Attachment attachment = (Attachment) ssn.getAttachment(); + attachment.receiver.closed(); ssn.setAttachment(null); } public void sessionIdle(IoSession ssn, IdleStatus status) { - System.out.println(status); + // do nothing } - public static final void main(String[] args) throws IOException + private class Attachment { - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - if (args[0].equals("accept")) { - accept("0.0.0.0", 5672, SessionDelegateStub.source()); - } else if (args[0].equals("connect")) { - connect("0.0.0.0", 5672, SessionDelegateStub.source()); + + Connection connection; + Receiver<java.nio.ByteBuffer> receiver; + + Attachment(Connection connection, + Receiver<java.nio.ByteBuffer> receiver) + { + this.connection = connection; + this.receiver = receiver; } } @@ -124,8 +138,7 @@ public class MinaHandler implements IoHandler { IoAcceptor acceptor = new SocketAcceptor(); acceptor.bind(new InetSocketAddress(host, port), - new MinaHandler(delegate, InputHandler.State.PROTO_HDR)); - + new MinaHandler(delegate, InputHandler.State.PROTO_HDR)); } public static final Connection connect(String host, int port, @@ -134,7 +147,8 @@ public class MinaHandler implements IoHandler MinaHandler handler = new MinaHandler(delegate, InputHandler.State.FRAME_HDR); SocketAddress addr = new InetSocketAddress(host, port); - IoConnector connector = new SocketConnector(); + SocketConnector connector = new SocketConnector(); + connector.setWorkerTimeout(0); ConnectFuture cf = connector.connect(addr, handler); cf.join(); IoSession ssn = cf.getSession(); @@ -153,8 +167,8 @@ public class MinaHandler implements IoHandler } } } - Connection conn = (Connection) ssn.getAttachment(); - return conn; + Attachment attachment = (Attachment) ssn.getAttachment(); + return attachment.connection; } } diff --git a/java/common/src/main/java/org/apache/qpidity/BodyHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java index 26e654d3a7..54e9ec28ef 100644 --- a/java/common/src/main/java/org/apache/qpidity/BodyHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java @@ -18,29 +18,37 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport.network.mina; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; + +import org.apache.qpidity.transport.Sender; /** - * BodyHandler + * MinaSender * - * @author Rafael H. Schloming */ -class BodyHandler implements Handler<Event<Session,Frame>> +public class MinaSender implements Sender<java.nio.ByteBuffer> { - private final SessionDelegate delegate; + private final IoSession session; + + public MinaSender(IoSession session) + { + this.session = session; + } - public BodyHandler(SessionDelegate delegate) + public void send(java.nio.ByteBuffer buf) { - this.delegate = delegate; + session.write(ByteBuffer.wrap(buf)); } - public void handle(Event<Session,Frame> event) + public void close() { - System.out.println("got body frame: " + event.target); - delegate.data(event.context, event.target); + session.close(); } } diff --git a/java/common/src/main/java/org/apache/qpidity/Functions.java b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java index 1008966b01..fb1d4ccddf 100644 --- a/java/common/src/main/java/org/apache/qpidity/Functions.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport.util; import java.nio.ByteBuffer; diff --git a/java/common/src/main/java/org/apache/qpidity/SliceIterator.java b/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java index 9b4a8f90f7..32392a3561 100644 --- a/java/common/src/main/java/org/apache/qpidity/SliceIterator.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.transport.util; import java.nio.ByteBuffer; @@ -31,7 +31,7 @@ import java.util.Iterator; * @author Rafael H. Schloming */ -class SliceIterator implements Iterator<ByteBuffer> +public class SliceIterator implements Iterator<ByteBuffer> { final private Iterator<ByteBuffer> iterator; diff --git a/java/plugins/src/main/java/org/apache/qpid/plugins/JythonMojo.java b/java/plugins/src/main/java/org/apache/qpid/plugins/JythonMojo.java index 1788f471d7..50af9c257e 100644 --- a/java/plugins/src/main/java/org/apache/qpid/plugins/JythonMojo.java +++ b/java/plugins/src/main/java/org/apache/qpid/plugins/JythonMojo.java @@ -21,6 +21,7 @@ package org.apache.qpid.plugins; import java.io.File; +import java.io.IOException; import org.apache.maven.plugin.AbstractMojo; import org.apache.maven.plugin.MojoExecutionException; @@ -46,9 +47,44 @@ public class JythonMojo extends AbstractMojo */ private String[] params = new String[0]; + /** + * Source file. + * + * @parameter + */ + private File source; + + /** + * Optional timestamp. + * + * @parameter + */ + private File timestamp; + public void execute() throws MojoExecutionException { + if (source != null && timestamp != null) + { + if (timestamp.lastModified() > source.lastModified()) + { + return; + } + } + jython.main(params); + + if (timestamp != null) + { + try + { + timestamp.createNewFile(); + } + catch (IOException e) + { + throw new MojoExecutionException("cannot create timestamp", e); + } + timestamp.setLastModified(System.currentTimeMillis()); + } } } |