summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-08-01 22:27:55 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-08-01 22:27:55 +0000
commit72918521dab761d149d2af02eaea892eb45eb4be (patch)
treebdf35f1b670000d43e775d78731b5cb7a669829b
parent84eb385ddc52f63a8fc28f25dbe8af2c751a8eb6 (diff)
downloadqpid-python-72918521dab761d149d2af02eaea892eb45eb4be.tar.gz
added wiring to Rafi's comm stack
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@561981 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xjava/common/generate1
-rw-r--r--java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java33
-rw-r--r--java/common/src/main/java/org/apache/qpidity/HeaderHandler.java1
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Session.java64
4 files changed, 99 insertions, 0 deletions
diff --git a/java/common/generate b/java/common/generate
index d9ea47aeca..f3464e7310 100755
--- a/java/common/generate
+++ b/java/common/generate
@@ -286,3 +286,4 @@ for pset in ("DeliveryProperties", "ApplicationProperties"):
hdrs = Output(out_dir, out_pkg, pset)
hdrs.line("public interface %s extends Header {}" % pset)
hdrs.write()
+
diff --git a/java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java
index e9173a5c5b..3bca6116a4 100644
--- a/java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpidity;
+import org.apache.qpidity.api.StreamingMessageListener;
+
/**
* CommonSessionDelegate
@@ -48,4 +50,35 @@ public class CommonSessionDelegate extends Delegate<Session>
@Override public void sessionDetached(Session session, SessionDetached struct) {}
+ @Override
+ public void messageTransfer(Session context, MessageTransfer struct)
+ {
+ StreamingMessageListener l = context.messagListeners.get(struct.getDestination());
+ l.messageTransfer(struct.getDestination(),new Option[0]);
+ }
+
+ // ---------------------------------------------------------------
+ // Non generated methods - but would like if they are also generated.
+ // These methods should be called from Body and Header Handlers.
+ // If these methods are generated as part of the delegate then
+ // I can call these methods from the BodyHandler and HeaderHandler
+ // in a generic way
+ // ----------------------------------------------------------------
+ public void data(Session context,String destination,byte[] src) throws QpidException
+ {
+ StreamingMessageListener l = context.messagListeners.get(destination);
+ l.data(src);
+ }
+
+ public void endData(Session context,String destination) throws QpidException
+ {
+ StreamingMessageListener l = context.messagListeners.get(destination);
+ l.endData();
+ }
+
+ public void messageHeaders(Session context,String destination,Header... headers) throws QpidException
+ {
+ StreamingMessageListener l = context.messagListeners.get(destination);
+ l.endData();
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java b/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java
index a63c032e38..8a407cd9f6 100644
--- a/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java
@@ -33,6 +33,7 @@ class HeaderHandler<C> implements Handler<Event<C,Segment>>
public void handle(Event<C,Segment> event)
{
System.out.println("got header segment:\n " + event.target);
+
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java
index 7d74849b0b..48860309a0 100644
--- a/java/common/src/main/java/org/apache/qpidity/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/Session.java
@@ -23,6 +23,9 @@ package org.apache.qpidity;
import java.util.HashMap;
import java.util.Map;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.api.StreamingMessageListener;
+
/**
* Session
*
@@ -37,6 +40,8 @@ public class Session extends Invoker
private int command_id = 0;
// XXX
final Map<Integer,Handler<Struct>> handlers = new HashMap<Integer,Handler<Struct>>();
+
+ Map<String,StreamingMessageListener> messagListeners = new HashMap<String,StreamingMessageListener>();
public void attach(Channel channel)
{
@@ -61,4 +66,63 @@ public class Session extends Invoker
return channel.getFactory();
}
+ // -----------------------------------------
+ // Messaging Methods
+ // ------------------------------------------
+ public void messageTransfer(String destination, Message msg) throws QpidException
+ {
+
+ }
+
+ public void data(byte[] src) throws QpidException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void endData() throws QpidException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void messageHeaders(Header... headers) throws QpidException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void messageTransfer(String destination,Option... options) throws QpidException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void messageAcknowledge() throws QpidException
+ {
+ // TODO Auto-generated method stub
+ }
+
+ public boolean messageAcquire() throws QpidException
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public void messageReject() throws QpidException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void messageRelease() throws QpidException
+ {
+ // TODO Auto-generated method stub
+ }
+
+ public void addMessageListener(String destination,StreamingMessageListener listener)
+ {
+ messagListeners.put(destination, listener);
+ }
+
}