summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-10-03 15:11:24 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-10-03 15:11:24 +0000
commit587542968e8f262b077d22bd4556044a5d65bef9 (patch)
treecafff79bf70073c2c4c1cd978cab61a810be6a56
parenta6e80e575a8ca66f0f4cc9bf6ae1ec4b37c073e9 (diff)
downloadqpid-python-587542968e8f262b077d22bd4556044a5d65bef9.tar.gz
client/* - Only Create a Threshold Listener if if the acknowledgeMode is NO_ACK
common/*/framing/* - White space changes from tabs to 4 spaces. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@452529 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/org/apache/qpid/client/AMQSession.java7
-rw-r--r--java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java19
-rw-r--r--java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java4
-rw-r--r--java/common/src/org/apache/qpid/framing/EncodingUtils.java350
4 files changed, 204 insertions, 176 deletions
diff --git a/java/client/src/org/apache/qpid/client/AMQSession.java b/java/client/src/org/apache/qpid/client/AMQSession.java
index 4768399036..3bc670e609 100644
--- a/java/client/src/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/org/apache/qpid/client/AMQSession.java
@@ -220,6 +220,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_channelId = channelId;
_messageFactoryRegistry = messageFactoryRegistry;
_defaultPrefetch = defaultPrefetch;
+ if (_acknowledgeMode == NO_ACKNOWLEDGE)
+ {
_queue = new FlowControllingBlockingQueue(_defaultPrefetch,
new FlowControllingBlockingQueue.ThresholdListener()
{
@@ -241,6 +243,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
});
+ }
+ else
+ {
+ _queue = new FlowControllingBlockingQueue(_defaultPrefetch,null);
+ }
}
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode)
diff --git a/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index ad2ca7b731..89e6968e44 100644
--- a/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -63,11 +63,14 @@ public class FlowControllingBlockingQueue
public Object take() throws InterruptedException
{
Object o = _queue.take();
- synchronized (_listener)
+ if (_listener != null)
{
- if (--_count == (_flowControlThreshold - 1))
+ synchronized(_listener)
{
- _listener.underThreshold(_count);
+ if (--_count == (_flowControlThreshold - 1))
+ {
+ _listener.underThreshold(_count);
+ }
}
}
return o;
@@ -76,12 +79,16 @@ public class FlowControllingBlockingQueue
public void add(Object o)
{
_queue.add(o);
- synchronized (_listener)
+ if (_listener != null)
{
- if (++_count == _flowControlThreshold)
+ synchronized(_listener)
{
- _listener.aboveThreshold(_count);
+ if (++_count == _flowControlThreshold)
+ {
+ _listener.aboveThreshold(_count);
+ }
}
}
}
}
+
diff --git a/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java b/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java
index b0430afac9..9c0ca26dcf 100644
--- a/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java
+++ b/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java
@@ -20,8 +20,8 @@ package org.apache.qpid.framing;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.codec.demux.MessageEncoder;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.filter.codec.demux.MessageEncoder;
import java.util.HashSet;
import java.util.Set;
@@ -48,7 +48,7 @@ public class AMQDataBlockEncoder implements MessageEncoder
if (_logger.isDebugEnabled())
{
- _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'");
+ _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'");
}
buffer.flip();
diff --git a/java/common/src/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/org/apache/qpid/framing/EncodingUtils.java
index 5b35044b25..65deb61dfb 100644
--- a/java/common/src/org/apache/qpid/framing/EncodingUtils.java
+++ b/java/common/src/org/apache/qpid/framing/EncodingUtils.java
@@ -24,7 +24,7 @@ import java.nio.charset.Charset;
public class EncodingUtils
{
- private static final Logger _logger = Logger.getLogger(EncodingUtils.class);
+ private static final Logger _logger = Logger.getLogger(EncodingUtils.class);
private static final String STRING_ENCODING = "iso8859-15";
@@ -91,7 +91,7 @@ public class EncodingUtils
else
{
// size of the table plus 4 octets for the size
- return (int)table.getEncodedSize() + 4;
+ return (int) table.getEncodedSize() + 4;
}
}
@@ -106,7 +106,7 @@ public class EncodingUtils
encodedString[i] = (byte) cha[i];
}
// TODO: check length fits in an unsigned byte
- writeUnsignedByte(buffer, (short)encodedString.length);
+ writeUnsignedByte(buffer, (short) encodedString.length);
buffer.put(encodedString);
}
else
@@ -179,37 +179,37 @@ public class EncodingUtils
public static void writeUnsignedShort(ByteBuffer buffer, int s)
{
- // TODO: Is this comparison safe? Do I need to cast RHS to long?
- if (s < Short.MAX_VALUE)
- {
- buffer.putShort((short)s);
- }
- else
- {
- short sv = (short) s;
- buffer.put((byte) (0xFF & (sv >> 8)));
- buffer.put((byte)(0xFF & sv));
- }
+ // TODO: Is this comparison safe? Do I need to cast RHS to long?
+ if (s < Short.MAX_VALUE)
+ {
+ buffer.putShort((short) s);
+ }
+ else
+ {
+ short sv = (short) s;
+ buffer.put((byte) (0xFF & (sv >> 8)));
+ buffer.put((byte) (0xFF & sv));
+ }
}
public static void writeUnsignedInteger(ByteBuffer buffer, long l)
{
- // TODO: Is this comparison safe? Do I need to cast RHS to long?
- if (l < Integer.MAX_VALUE)
- {
- buffer.putInt((int)l);
- }
- else
- {
- int iv = (int) l;
-
- // FIXME: This *may* go faster if we build this into a local 4-byte array and then
- // put the array in a single call.
- buffer.put((byte) (0xFF & (iv >> 24)));
- buffer.put((byte) (0xFF & (iv >> 16)));
- buffer.put((byte) (0xFF & (iv >> 8 )));
- buffer.put((byte) (0xFF & iv));
- }
+ // TODO: Is this comparison safe? Do I need to cast RHS to long?
+ if (l < Integer.MAX_VALUE)
+ {
+ buffer.putInt((int) l);
+ }
+ else
+ {
+ int iv = (int) l;
+
+ // FIXME: This *may* go faster if we build this into a local 4-byte array and then
+ // put the array in a single call.
+ buffer.put((byte) (0xFF & (iv >> 24)));
+ buffer.put((byte) (0xFF & (iv >> 16)));
+ buffer.put((byte) (0xFF & (iv >> 8)));
+ buffer.put((byte) (0xFF & iv));
+ }
}
public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table)
@@ -231,7 +231,7 @@ public class EncodingUtils
{
if (values[i])
{
- packedValue = (byte)(packedValue | (1 << i));
+ packedValue = (byte) (packedValue | (1 << i));
}
}
@@ -255,7 +255,7 @@ public class EncodingUtils
writeUnsignedInteger(buffer, 0);
}
}
-
+
public static void writeTimestamp(ByteBuffer buffer, long timestamp)
{
writeUnsignedInteger(buffer, 0/*timestamp msb*/);
@@ -352,7 +352,7 @@ public class EncodingUtils
return result;
}
}
-
+
public static long readTimestamp(ByteBuffer buffer)
{
// Discard msb from AMQ timestamp
@@ -367,166 +367,180 @@ public class EncodingUtils
//
// Input: "a=1" "a=2 c=3" "a=3 c=4 d" "a='four' b='five'" "a=bad"
//
- // Parsing <a=1>...
- // {a=1}
- // Parsing <a=2 c=3>...
- // {a=2, c=3}
- // Parsing <a=3 c=4 d>...
- // {a=3, c=4, d=null}
- // Parsing <a='four' b='five'>...
- // {a=four, b=five}
- // Parsing <a=bad>...
- // java.lang.IllegalArgumentException: a: Invalid integer in <bad> from <a=bad>.
+ // Parsing <a=1>...
+ // {a=1}
+ // Parsing <a=2 c=3>...
+ // {a=2, c=3}
+ // Parsing <a=3 c=4 d>...
+ // {a=3, c=4, d=null}
+ // Parsing <a='four' b='five'>...
+ // {a=four, b=five}
+ // Parsing <a=bad>...
+ // java.lang.IllegalArgumentException: a: Invalid integer in <bad> from <a=bad>.
//
public static FieldTable createFieldTableFromMessageSelector(String selector)
{
- boolean debug = _logger.isDebugEnabled();
-
- // TODO: Doesn't support embedded quotes properly.
- String[] expressions = selector.split(" +");
-
- FieldTable result = new FieldTable();
-
- for(int i = 0; i < expressions.length; i++)
- {
- String expr = expressions[i];
-
- if (debug) _logger.debug("Expression = <" + expr + ">");
-
- int equals = expr.indexOf('=');
-
- if (equals < 0)
- {
- // Existence check
- result.put("S" + expr.trim(),null);
- }
- else
- {
- String key = expr.substring(0,equals).trim();
- String value = expr.substring(equals + 1).trim();
-
- if (debug) _logger.debug("Key = <" + key + ">, Value = <" + value + ">");
-
- if (value.charAt(0) == '\'')
- {
- if (value.charAt(value.length()- 1) != '\'')
- {
- throw new IllegalArgumentException(key + ": Missing quote in <" + value + "> from <" + selector + ">.");
- }
- else
- {
- value = value.substring(1,value.length() - 1);
-
- result.put("S" + key,value);
- }
- }
- else
- {
- try
- {
- int intValue = Integer.parseInt(value);
-
- result.put("i" + key,value);
- }
- catch(NumberFormatException e)
- {
- throw new IllegalArgumentException(key + ": Invalid integer in <" + value + "> from <" + selector + ">.");
-
- }
- }
- }
- }
-
- if (debug) _logger.debug("Field-table created from <" + selector + "> is <" + result + ">");
-
- return(result);
+ boolean debug = _logger.isDebugEnabled();
+
+ // TODO: Doesn't support embedded quotes properly.
+ String[] expressions = selector.split(" +");
+
+ FieldTable result = new FieldTable();
+
+ for (int i = 0; i < expressions.length; i++)
+ {
+ String expr = expressions[i];
+
+ if (debug)
+ {
+ _logger.debug("Expression = <" + expr + ">");
+ }
+
+ int equals = expr.indexOf('=');
+
+ if (equals < 0)
+ {
+ // Existence check
+ result.put("S" + expr.trim(), null);
+ }
+ else
+ {
+ String key = expr.substring(0, equals).trim();
+ String value = expr.substring(equals + 1).trim();
+
+ if (debug)
+ {
+ _logger.debug("Key = <" + key + ">, Value = <" + value + ">");
+ }
+
+ if (value.charAt(0) == '\'')
+ {
+ if (value.charAt(value.length() - 1) != '\'')
+ {
+ throw new IllegalArgumentException(key + ": Missing quote in <" + value + "> from <" + selector + ">.");
+ }
+ else
+ {
+ value = value.substring(1, value.length() - 1);
+
+ result.put("S" + key, value);
+ }
+ }
+ else
+ {
+ try
+ {
+ int intValue = Integer.parseInt(value);
+
+ result.put("i" + key, value);
+ }
+ catch (NumberFormatException e)
+ {
+ throw new IllegalArgumentException(key + ": Invalid integer in <" + value + "> from <" + selector + ">.");
+
+ }
+ }
+ }
+ }
+
+ if (debug)
+ {
+ _logger.debug("Field-table created from <" + selector + "> is <" + result + ">");
+ }
+
+ return (result);
}
- static byte[] hexToByteArray(String id)
- {
- // Should check param for null, long enough for this check, upper-case and trailing char
- String s = (id.charAt(1) == 'x') ? id.substring(2) : id; // strip 0x
+ static byte[] hexToByteArray(String id)
+ {
+ // Should check param for null, long enough for this check, upper-case and trailing char
+ String s = (id.charAt(1) == 'x') ? id.substring(2) : id; // strip 0x
- int len = s.length();
- int byte_len = len / 2;
- byte[] b = new byte[byte_len];
+ int len = s.length();
+ int byte_len = len / 2;
+ byte[] b = new byte[byte_len];
- for(int i = 0; i < byte_len; i++)
- {
- // fixme: refine these repetitive subscript calcs.
- int ch = i * 2;
+ for (int i = 0; i < byte_len; i++)
+ {
+ // fixme: refine these repetitive subscript calcs.
+ int ch = i * 2;
- byte b1 = Byte.parseByte(s.substring(ch,ch + 1),16);
- byte b2 = Byte.parseByte(s.substring(ch + 1,ch + 2),16);
+ byte b1 = Byte.parseByte(s.substring(ch, ch + 1), 16);
+ byte b2 = Byte.parseByte(s.substring(ch + 1, ch + 2), 16);
- b[i] = (byte)(b1 * 16 + b2);
- }
+ b[i] = (byte) (b1 * 16 + b2);
+ }
- return(b);
- }
+ return (b);
+ }
- public static char[] convertToHexCharArray(byte[] from)
- {
- int length = from.length;
- char[] result_buff = new char[length * 2 + 2];
+ public static char[] convertToHexCharArray(byte[] from)
+ {
+ int length = from.length;
+ char[] result_buff = new char[length * 2 + 2];
- result_buff[0] = '0';
- result_buff[1] = 'x';
+ result_buff[0] = '0';
+ result_buff[1] = 'x';
- int bite;
- int dest = 2;
+ int bite;
+ int dest = 2;
- for(int i = 0; i < length; i++)
- {
- bite = from[i];
+ for (int i = 0; i < length; i++)
+ {
+ bite = from[i];
- if (bite < 0) bite += 256;
+ if (bite < 0)
+ {
+ bite += 256;
+ }
- result_buff[dest++] = hex_chars[bite >> 4];
- result_buff[dest++] = hex_chars[bite & 0x0f];
- }
+ result_buff[dest++] = hex_chars[bite >> 4];
+ result_buff[dest++] = hex_chars[bite & 0x0f];
+ }
- return(result_buff);
- }
+ return (result_buff);
+ }
- public static String convertToHexString(byte[] from)
- {
- return(new String(convertToHexCharArray(from)));
- }
+ public static String convertToHexString(byte[] from)
+ {
+ return (new String(convertToHexCharArray(from)));
+ }
- public static String convertToHexString(ByteBuffer bb)
- {
- int size = bb.limit();
+ public static String convertToHexString(ByteBuffer bb)
+ {
+ int size = bb.limit();
- byte[] from = new byte[size];
+ byte[] from = new byte[size];
- for(int i = 0; i < size; i++)
- {
- from[i] = bb.get(i);
- }
+ // Is this not the same.
+ //bb.get(from, 0, size);
+ for (int i = 0; i < size; i++)
+ {
+ from[i] = bb.get(i);
+ }
- return(new String(convertToHexCharArray(from)));
- }
+ return (new String(convertToHexCharArray(from)));
+ }
- public static void main(String[] args)
- {
- for(int i = 0; i < args.length; i++)
- {
- String selector = args[i];
+ public static void main(String[] args)
+ {
+ for (int i = 0; i < args.length; i++)
+ {
+ String selector = args[i];
- System.err.println("Parsing <" + selector + ">...");
+ System.err.println("Parsing <" + selector + ">...");
- try
- {
- System.err.println(createFieldTableFromMessageSelector(selector));
- }
- catch(IllegalArgumentException e)
- {
- System.err.println(e);
- }
- }
- }
+ try
+ {
+ System.err.println(createFieldTableFromMessageSelector(selector));
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.err.println(e);
+ }
+ }
+ }
- private static char hex_chars[] = {'0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'};
+ private static char hex_chars[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
}