diff options
author | Martin Ritchie <ritchiem@apache.org> | 2006-10-03 15:11:24 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2006-10-03 15:11:24 +0000 |
commit | 587542968e8f262b077d22bd4556044a5d65bef9 (patch) | |
tree | cafff79bf70073c2c4c1cd978cab61a810be6a56 | |
parent | a6e80e575a8ca66f0f4cc9bf6ae1ec4b37c073e9 (diff) | |
download | qpid-python-587542968e8f262b077d22bd4556044a5d65bef9.tar.gz |
client/* - Only Create a Threshold Listener if if the acknowledgeMode is NO_ACK
common/*/framing/* - White space changes from tabs to 4 spaces.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@452529 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 204 insertions, 176 deletions
diff --git a/java/client/src/org/apache/qpid/client/AMQSession.java b/java/client/src/org/apache/qpid/client/AMQSession.java index 4768399036..3bc670e609 100644 --- a/java/client/src/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/org/apache/qpid/client/AMQSession.java @@ -220,6 +220,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _channelId = channelId; _messageFactoryRegistry = messageFactoryRegistry; _defaultPrefetch = defaultPrefetch; + if (_acknowledgeMode == NO_ACKNOWLEDGE) + { _queue = new FlowControllingBlockingQueue(_defaultPrefetch, new FlowControllingBlockingQueue.ThresholdListener() { @@ -241,6 +243,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } }); + } + else + { + _queue = new FlowControllingBlockingQueue(_defaultPrefetch,null); + } } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode) diff --git a/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index ad2ca7b731..89e6968e44 100644 --- a/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -63,11 +63,14 @@ public class FlowControllingBlockingQueue public Object take() throws InterruptedException { Object o = _queue.take(); - synchronized (_listener) + if (_listener != null) { - if (--_count == (_flowControlThreshold - 1)) + synchronized(_listener) { - _listener.underThreshold(_count); + if (--_count == (_flowControlThreshold - 1)) + { + _listener.underThreshold(_count); + } } } return o; @@ -76,12 +79,16 @@ public class FlowControllingBlockingQueue public void add(Object o) { _queue.add(o); - synchronized (_listener) + if (_listener != null) { - if (++_count == _flowControlThreshold) + synchronized(_listener) { - _listener.aboveThreshold(_count); + if (++_count == _flowControlThreshold) + { + _listener.aboveThreshold(_count); + } } } } } + diff --git a/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java b/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java index b0430afac9..9c0ca26dcf 100644 --- a/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java +++ b/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java @@ -20,8 +20,8 @@ package org.apache.qpid.framing; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; -import org.apache.mina.filter.codec.demux.MessageEncoder; import org.apache.mina.filter.codec.ProtocolEncoderOutput; +import org.apache.mina.filter.codec.demux.MessageEncoder; import java.util.HashSet; import java.util.Set; @@ -48,7 +48,7 @@ public class AMQDataBlockEncoder implements MessageEncoder if (_logger.isDebugEnabled()) { - _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'"); + _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'"); } buffer.flip(); diff --git a/java/common/src/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/org/apache/qpid/framing/EncodingUtils.java index 5b35044b25..65deb61dfb 100644 --- a/java/common/src/org/apache/qpid/framing/EncodingUtils.java +++ b/java/common/src/org/apache/qpid/framing/EncodingUtils.java @@ -24,7 +24,7 @@ import java.nio.charset.Charset; public class EncodingUtils { - private static final Logger _logger = Logger.getLogger(EncodingUtils.class); + private static final Logger _logger = Logger.getLogger(EncodingUtils.class); private static final String STRING_ENCODING = "iso8859-15"; @@ -91,7 +91,7 @@ public class EncodingUtils else { // size of the table plus 4 octets for the size - return (int)table.getEncodedSize() + 4; + return (int) table.getEncodedSize() + 4; } } @@ -106,7 +106,7 @@ public class EncodingUtils encodedString[i] = (byte) cha[i]; } // TODO: check length fits in an unsigned byte - writeUnsignedByte(buffer, (short)encodedString.length); + writeUnsignedByte(buffer, (short) encodedString.length); buffer.put(encodedString); } else @@ -179,37 +179,37 @@ public class EncodingUtils public static void writeUnsignedShort(ByteBuffer buffer, int s) { - // TODO: Is this comparison safe? Do I need to cast RHS to long? - if (s < Short.MAX_VALUE) - { - buffer.putShort((short)s); - } - else - { - short sv = (short) s; - buffer.put((byte) (0xFF & (sv >> 8))); - buffer.put((byte)(0xFF & sv)); - } + // TODO: Is this comparison safe? Do I need to cast RHS to long? + if (s < Short.MAX_VALUE) + { + buffer.putShort((short) s); + } + else + { + short sv = (short) s; + buffer.put((byte) (0xFF & (sv >> 8))); + buffer.put((byte) (0xFF & sv)); + } } public static void writeUnsignedInteger(ByteBuffer buffer, long l) { - // TODO: Is this comparison safe? Do I need to cast RHS to long? - if (l < Integer.MAX_VALUE) - { - buffer.putInt((int)l); - } - else - { - int iv = (int) l; - - // FIXME: This *may* go faster if we build this into a local 4-byte array and then - // put the array in a single call. - buffer.put((byte) (0xFF & (iv >> 24))); - buffer.put((byte) (0xFF & (iv >> 16))); - buffer.put((byte) (0xFF & (iv >> 8 ))); - buffer.put((byte) (0xFF & iv)); - } + // TODO: Is this comparison safe? Do I need to cast RHS to long? + if (l < Integer.MAX_VALUE) + { + buffer.putInt((int) l); + } + else + { + int iv = (int) l; + + // FIXME: This *may* go faster if we build this into a local 4-byte array and then + // put the array in a single call. + buffer.put((byte) (0xFF & (iv >> 24))); + buffer.put((byte) (0xFF & (iv >> 16))); + buffer.put((byte) (0xFF & (iv >> 8))); + buffer.put((byte) (0xFF & iv)); + } } public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table) @@ -231,7 +231,7 @@ public class EncodingUtils { if (values[i]) { - packedValue = (byte)(packedValue | (1 << i)); + packedValue = (byte) (packedValue | (1 << i)); } } @@ -255,7 +255,7 @@ public class EncodingUtils writeUnsignedInteger(buffer, 0); } } - + public static void writeTimestamp(ByteBuffer buffer, long timestamp) { writeUnsignedInteger(buffer, 0/*timestamp msb*/); @@ -352,7 +352,7 @@ public class EncodingUtils return result; } } - + public static long readTimestamp(ByteBuffer buffer) { // Discard msb from AMQ timestamp @@ -367,166 +367,180 @@ public class EncodingUtils // // Input: "a=1" "a=2 c=3" "a=3 c=4 d" "a='four' b='five'" "a=bad" // - // Parsing <a=1>... - // {a=1} - // Parsing <a=2 c=3>... - // {a=2, c=3} - // Parsing <a=3 c=4 d>... - // {a=3, c=4, d=null} - // Parsing <a='four' b='five'>... - // {a=four, b=five} - // Parsing <a=bad>... - // java.lang.IllegalArgumentException: a: Invalid integer in <bad> from <a=bad>. + // Parsing <a=1>... + // {a=1} + // Parsing <a=2 c=3>... + // {a=2, c=3} + // Parsing <a=3 c=4 d>... + // {a=3, c=4, d=null} + // Parsing <a='four' b='five'>... + // {a=four, b=five} + // Parsing <a=bad>... + // java.lang.IllegalArgumentException: a: Invalid integer in <bad> from <a=bad>. // public static FieldTable createFieldTableFromMessageSelector(String selector) { - boolean debug = _logger.isDebugEnabled(); - - // TODO: Doesn't support embedded quotes properly. - String[] expressions = selector.split(" +"); - - FieldTable result = new FieldTable(); - - for(int i = 0; i < expressions.length; i++) - { - String expr = expressions[i]; - - if (debug) _logger.debug("Expression = <" + expr + ">"); - - int equals = expr.indexOf('='); - - if (equals < 0) - { - // Existence check - result.put("S" + expr.trim(),null); - } - else - { - String key = expr.substring(0,equals).trim(); - String value = expr.substring(equals + 1).trim(); - - if (debug) _logger.debug("Key = <" + key + ">, Value = <" + value + ">"); - - if (value.charAt(0) == '\'') - { - if (value.charAt(value.length()- 1) != '\'') - { - throw new IllegalArgumentException(key + ": Missing quote in <" + value + "> from <" + selector + ">."); - } - else - { - value = value.substring(1,value.length() - 1); - - result.put("S" + key,value); - } - } - else - { - try - { - int intValue = Integer.parseInt(value); - - result.put("i" + key,value); - } - catch(NumberFormatException e) - { - throw new IllegalArgumentException(key + ": Invalid integer in <" + value + "> from <" + selector + ">."); - - } - } - } - } - - if (debug) _logger.debug("Field-table created from <" + selector + "> is <" + result + ">"); - - return(result); + boolean debug = _logger.isDebugEnabled(); + + // TODO: Doesn't support embedded quotes properly. + String[] expressions = selector.split(" +"); + + FieldTable result = new FieldTable(); + + for (int i = 0; i < expressions.length; i++) + { + String expr = expressions[i]; + + if (debug) + { + _logger.debug("Expression = <" + expr + ">"); + } + + int equals = expr.indexOf('='); + + if (equals < 0) + { + // Existence check + result.put("S" + expr.trim(), null); + } + else + { + String key = expr.substring(0, equals).trim(); + String value = expr.substring(equals + 1).trim(); + + if (debug) + { + _logger.debug("Key = <" + key + ">, Value = <" + value + ">"); + } + + if (value.charAt(0) == '\'') + { + if (value.charAt(value.length() - 1) != '\'') + { + throw new IllegalArgumentException(key + ": Missing quote in <" + value + "> from <" + selector + ">."); + } + else + { + value = value.substring(1, value.length() - 1); + + result.put("S" + key, value); + } + } + else + { + try + { + int intValue = Integer.parseInt(value); + + result.put("i" + key, value); + } + catch (NumberFormatException e) + { + throw new IllegalArgumentException(key + ": Invalid integer in <" + value + "> from <" + selector + ">."); + + } + } + } + } + + if (debug) + { + _logger.debug("Field-table created from <" + selector + "> is <" + result + ">"); + } + + return (result); } - static byte[] hexToByteArray(String id) - { - // Should check param for null, long enough for this check, upper-case and trailing char - String s = (id.charAt(1) == 'x') ? id.substring(2) : id; // strip 0x + static byte[] hexToByteArray(String id) + { + // Should check param for null, long enough for this check, upper-case and trailing char + String s = (id.charAt(1) == 'x') ? id.substring(2) : id; // strip 0x - int len = s.length(); - int byte_len = len / 2; - byte[] b = new byte[byte_len]; + int len = s.length(); + int byte_len = len / 2; + byte[] b = new byte[byte_len]; - for(int i = 0; i < byte_len; i++) - { - // fixme: refine these repetitive subscript calcs. - int ch = i * 2; + for (int i = 0; i < byte_len; i++) + { + // fixme: refine these repetitive subscript calcs. + int ch = i * 2; - byte b1 = Byte.parseByte(s.substring(ch,ch + 1),16); - byte b2 = Byte.parseByte(s.substring(ch + 1,ch + 2),16); + byte b1 = Byte.parseByte(s.substring(ch, ch + 1), 16); + byte b2 = Byte.parseByte(s.substring(ch + 1, ch + 2), 16); - b[i] = (byte)(b1 * 16 + b2); - } + b[i] = (byte) (b1 * 16 + b2); + } - return(b); - } + return (b); + } - public static char[] convertToHexCharArray(byte[] from) - { - int length = from.length; - char[] result_buff = new char[length * 2 + 2]; + public static char[] convertToHexCharArray(byte[] from) + { + int length = from.length; + char[] result_buff = new char[length * 2 + 2]; - result_buff[0] = '0'; - result_buff[1] = 'x'; + result_buff[0] = '0'; + result_buff[1] = 'x'; - int bite; - int dest = 2; + int bite; + int dest = 2; - for(int i = 0; i < length; i++) - { - bite = from[i]; + for (int i = 0; i < length; i++) + { + bite = from[i]; - if (bite < 0) bite += 256; + if (bite < 0) + { + bite += 256; + } - result_buff[dest++] = hex_chars[bite >> 4]; - result_buff[dest++] = hex_chars[bite & 0x0f]; - } + result_buff[dest++] = hex_chars[bite >> 4]; + result_buff[dest++] = hex_chars[bite & 0x0f]; + } - return(result_buff); - } + return (result_buff); + } - public static String convertToHexString(byte[] from) - { - return(new String(convertToHexCharArray(from))); - } + public static String convertToHexString(byte[] from) + { + return (new String(convertToHexCharArray(from))); + } - public static String convertToHexString(ByteBuffer bb) - { - int size = bb.limit(); + public static String convertToHexString(ByteBuffer bb) + { + int size = bb.limit(); - byte[] from = new byte[size]; + byte[] from = new byte[size]; - for(int i = 0; i < size; i++) - { - from[i] = bb.get(i); - } + // Is this not the same. + //bb.get(from, 0, size); + for (int i = 0; i < size; i++) + { + from[i] = bb.get(i); + } - return(new String(convertToHexCharArray(from))); - } + return (new String(convertToHexCharArray(from))); + } - public static void main(String[] args) - { - for(int i = 0; i < args.length; i++) - { - String selector = args[i]; + public static void main(String[] args) + { + for (int i = 0; i < args.length; i++) + { + String selector = args[i]; - System.err.println("Parsing <" + selector + ">..."); + System.err.println("Parsing <" + selector + ">..."); - try - { - System.err.println(createFieldTableFromMessageSelector(selector)); - } - catch(IllegalArgumentException e) - { - System.err.println(e); - } - } - } + try + { + System.err.println(createFieldTableFromMessageSelector(selector)); + } + catch (IllegalArgumentException e) + { + System.err.println(e); + } + } + } - private static char hex_chars[] = {'0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'}; + private static char hex_chars[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; } |