diff options
author | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
commit | 913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch) | |
tree | 7ea442d6867d0076f1c9ea4f4265664059e7aff5 /java/common | |
download | qpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz |
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze
Repository Root: https://etp.108.redhat.com/svn/etp
Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48
Revision: 608
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
102 files changed, 10165 insertions, 0 deletions
diff --git a/java/common/bin/qpid-run b/java/common/bin/qpid-run new file mode 100644 index 0000000000..ef2348d45e --- /dev/null +++ b/java/common/bin/qpid-run @@ -0,0 +1,176 @@ +#!/bin/bash +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +die() { + if [[ $1 = -usage ]]; then + shift + usage=true + else + usage=false + fi + echo "$@" + $usage && echo + $usage && usage + exit 1 +} + +if [ -z "$QPID_HOME" ]; then + die "QPID_HOME must be set" +fi + +program=$(basename $0) +sourced=${BASH_SOURCE[0]} +if [[ -z ${sourced:-''} ]]; then + sourced=$(which qpid-run) || ${QPID_HOME}/bin/qpid-run +fi + +usage() { + echo Usage: $program ... "[-run:<option>]" ... + echo + echo Options: + egrep -B 1 "^\s*#USAGE: " ${sourced} |\ + sed "s/#USAGE:/ /" |\ + sed "s/-run:\(.*\))/-run:\1/" |\ + sed "s/-run:\(.*\)=\*/-run:\1=<value>/" |\ + sed "s/^--$//" +} + +cygwin=false +if [[ "$(uname -a | fgrep Cygwin)" != "" ]]; then + cygwin=true +fi + +export EXTERNAL_CLASSPATH=$CLASSPATH +unset CLASSPATH + +conf=$QPID_HOME/etc/$program.conf +if [ ! -e $conf ]; then + conf=$QPID_HOME/etc/$(basename ${sourced}).conf +fi + +if [ -r $conf ]; then + . $conf +else + die "unable to source $conf" +fi + +declare -a RUN_ARGS JAVA_ARGS +for arg in "$@"; do + if [[ $arg == -run:* ]]; then + RUN_ARGS[${#RUN_ARGS[@]}]="$arg" + else + JAVA_ARGS[${#JAVA_ARGS[@]}]="$arg" + fi +done + +# this defines the default behavior, it may be modified during option +# processing below +DISPATCH() { + if $debug; then + echo "CLASSPATH=${CLASSPATH}" + echo "${COMMAND[@]}" + fi + + exec "${COMMAND[@]}" +} + +exclusive() { + if [ -z "$PREVIOUS_ARGS" ]; then + PREVIOUS_ARGS=$1 + else + PREVIOUS_ARGS+=", $1" + DISPATCH() { + die -usage "you must choose one of: $PREVIOUS_ARGS" + } + fi +} + +debug=false + +for arg in "${RUN_ARGS[@]}"; do + case $arg in + -run:debug) +#USAGE: print the classpath and command before running it + debug=true + ;; + -run:jpda) +#USAGE: adds debugging options to the java command, use +#USAGE: JDPA_TRANSPORT and JPDA_ADDRESS to customize the debugging +#USAGE: behavior and use JPDA_OPTS to override it entirely + if [ -z "$JPDA_OPTS" ]; then + JPDA_OPTS="-Xdebug -Xrunjdwp:transport=${JPDA_TRANSPORT:-dt_socket},address=${JPDA_ADDRESS:-8000},server=y,suspend=n" + fi + QPID_OPTS+=" ${JPDA_OPTS}" + ;; + -run:external-classpath=*) +#USAGE: controls how the CLASSPATH environment variable is used by +#USAGE: this script, value can be one of ignore (the default), first, +#USAGE: last, and only + case $arg in + *=ignore) + # do nothing + ;; + *=first) + CLASSPATH=$EXTERNAL_CLASSPATH:$CLASSPATH + ;; + *=last) + CLASSPATH=$CLASSPATH:$EXTERNAL_CLASSPATH + ;; + *=only) + CLASSPATH=$EXTERNAL_CLASSPATH + ;; + *) + die -usage $(echo $arg | sed "s/=/: invalid value '/")\' + ;; + esac + ;; + -run:print-classpath) +#USAGE: print the classpath + DISPATCH() { + echo $CLASSPATH + } + exclusive $arg + ;; + -run:print-command) +#USAGE: print the command + DISPATCH() { + echo "${COMMAND[@]}" + } + exclusive $arg + ;; + -run:help) +#USAGE: print this message + DISPATCH() { + usage + } + exclusive $arg + ;; + *) + die -usage "unrecognized -run option '$arg'" + ;; + esac +done + +if $cygwin; then + QPID_HOME=$(cygpath -w $QPID_HOME) + CLASSPATH=$(cygpath -w -p $CLASSPATH) + JAVA=$(cygpath -u $JAVA) +fi + +COMMAND=($JAVA $JAVA_VM $JAVA_MEM -DQPID_HOME=$QPID_HOME $JAVA_OPTS $QPID_OPTS "${JAVA_ARGS[@]}") + +DISPATCH diff --git a/java/common/build-module.xml b/java/common/build-module.xml new file mode 100644 index 0000000000..2e7a4ec58a --- /dev/null +++ b/java/common/build-module.xml @@ -0,0 +1,129 @@ +<!-- + - + - Copyright (c) 2006 The Apache Software Foundation + - + - Licensed under the Apache License, Version 2.0 (the "License"); + - you may not use this file except in compliance with the License. + - You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, software + - distributed under the License is distributed on an "AS IS" BASIS, + - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + - See the License for the specific language governing permissions and + - limitations under the License. + - + --> +<project name="AMQ Common" default="build"> + + <import file="../module.xml"/> + + <property name="cluster.asl" value="resources/cluster.asl"/> + <property name="stylesheet" value="stylesheets/framing.xsl"/> + <property name="registry_stylesheet" value="stylesheets/registry.xsl"/> + <property name="registry_template" value="resources/registry.template"/> + <property name="saxon.jar" value="lib/saxon/saxon8.jar"/> + <property name="generated.package" value="org/apache/qpid/framing"/> + <property name="generated.dir" value="${module.precompiled}/${generated.package}"/> + <property name="proto_version" value="${generated.dir}/ProtocolVersionList.java"/> + + <macrodef name="saxon"> + <attribute name="out"/> + <attribute name="src"/> + <attribute name="xsl"/> + <element name="args" implicit="true" optional="true"/> + <sequential> + <java jar="${saxon.jar}" fork="true"> + <arg value="-o"/> + <arg value="@{out}"/> + <arg value="@{src}"/> + <arg value="@{xsl}"/> + <args/> + </java> + </sequential> + </macrodef> + + <macrodef name="amqp"> + <attribute name="ver"/> + <sequential> + <!-- Check for the existence of the AMQP specification file --> + <property name="amqpspecfile-@{ver}" value="${project.root}/../specs/amqp-@{ver}.xml"/> + <available file="${project.root}/../specs/amqp-@{ver}.xml" + property="amqpspecfile.present-@{ver}"/> + <fail unless="amqpspecfile.present-@{ver}" + message="ERROR: AMQP specification file ${project.root}/../specs/amqp-@{ver}.xml not found."/> + + <!-- Read in the file as a set of properties; extract the amqp version --> + <xmlproperty prefix="@{ver}" file="${project.root}/../specs/amqp-@{ver}.xml"/> + <echo>Found AMQP specification file "${project.root}/../specs/amqp-@{ver}.xml"; major=${@{ver}.amqp(major)} minor=${@{ver}.amqp(minor)}</echo> + + <!-- Add the version to the ProtocolVersionList.java file --> + <replaceregexp file="${proto_version}" match=" // !VER!" + replace=",${line.separator} {${@{ver}.amqp(major)}, ${@{ver}.amqp(minor)}} // !VER!" + flags="s" byline="true"/> + <replaceregexp file="${proto_version}" match=" // !VER1!" + replace="{${@{ver}.amqp(major)}, ${@{ver}.amqp(minor)}} // !VER!" + flags="s" byline="true"/> + + <!-- Create directory; generate from specification file --> + <mkdir dir="${generated.dir}_${@{ver}.amqp(major)}_${@{ver}.amqp(minor)}"/> + <saxon out="${generated.dir}_${@{ver}.amqp(major)}_${@{ver}.amqp(minor)}/results.out" + src="${project.root}/../specs/amqp-@{ver}.xml" + xsl="${stylesheet}"> + <arg value="major=${@{ver}.amqp(major)}"/> + <arg value="minor=${@{ver}.amqp(minor)}"/> + <arg value="registry_name=MainRegistry"/> + </saxon> + <!-- --> + <saxon out="${generated.dir}_${@{ver}.amqp(major)}_${@{ver}.amqp(minor)}/cluster.out" + src="${cluster.asl}" + xsl="${stylesheet}"> + <arg value="major=${@{ver}.amqp(major)}"/> + <arg value="minor=${@{ver}.amqp(minor)}"/> + <arg value="registry_name=ClusterRegistry"/> + </saxon> + <saxon out="${generated.dir}_${@{ver}.amqp(major)}_${@{ver}.amqp(minor)}/registry.out" + src="${registry_template}" + xsl="${registry_stylesheet}"> + <arg value="major=${@{ver}.amqp(major)}"/> + <arg value="minor=${@{ver}.amqp(minor)}"/> + </saxon> + </sequential> + </macrodef> + +<!-- <uptodate property="generated" targetfile="${generated.dir}/results.out" + srcfile="${amqp.xml}"/> --> + +<!-- <target name="generate" unless="generated"> --> + <target name="generate"> + <mkdir dir="${generated.dir}"/> + <copy file="resources/ProtocolVersionList.java" tofile="${proto_version}" + overwrite="true"/> + <!-- + NOTE: Set the AMQP version numbers to be supported in this build here. + The last version in this list will be the version returned when a protocol + ProtocolInitiation NAK frame is returned by the broker. Usually this is the + highest or most recent version. + --> + <!-- <amqp ver="0.8"/> + <amqp ver="0.9"/> + <amqp ver="0.10"/> --> + <amqp ver="8.0"/> + +<!-- <saxon out="${generated.dir}/results.out" src="${amqp.xml}" + xsl="${stylesheet}"> + <arg value="asl_base=${asl.base}"/> + <arg value="registry_name=MainRegistry"/> + </saxon> + <saxon out="${generated.dir}/cluster.out" src="${cluster.asl}" + xsl="${stylesheet}"> + <arg value="registry_name=ClusterRegistry"/> + </saxon> + <saxon out="${generated.dir}/registry.out" src="${registry_template}" + xsl="${registry_stylesheet}"/> --> + </target> + + <target name="precompile" depends="generate"/> + +</project> diff --git a/java/common/build-old.xml b/java/common/build-old.xml new file mode 100644 index 0000000000..ccf66af21d --- /dev/null +++ b/java/common/build-old.xml @@ -0,0 +1,98 @@ +<!-- + - + - Copyright (c) 2006 The Apache Software Foundation + - + - Licensed under the Apache License, Version 2.0 (the "License"); + - you may not use this file except in compliance with the License. + - You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, software + - distributed under the License is distributed on an "AS IS" BASIS, + - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + - See the License for the specific language governing permissions and + - limitations under the License. + - + --> +<project name="AMQ Java Framing Layer" default="build"> + <property name="amq.home" value="../.."/> + <path id="amq.home.path"> + <pathelement location="${amq.home}"/> + </path> + + <pathconvert targetos="unix" property="amq.home.fixed" refid="amq.home.path"/> + + <property name="amq.asl" value="${amq.home.fixed}/specs/amqp.xml"/> + <property name="cluster.asl" value="resources/cluster.asl"/> + + <property name="stylesheet" value="stylesheets/framing.xsl"/> + <property name="registry_stylesheet" value="stylesheets/registry.xsl"/> + <property name="registry_template" value="resources/registry.template"/> + <property name="saxon.jar" value="lib/saxon/saxon8.jar"/> + <property name="generated.src" value="generated/java/org.apache.qpid/framing"/> + <property name="static.src" value="src"/> + <property name="resources" value="resources"/> + <property name="base.lib" value="lib"/> + + <path id="project.classpath"> + <fileset dir="${base.lib}"> + <include name="**/*.jar"/> + </fileset> + </path> + + <target name="prepare"> + <mkdir dir="classes"/> + </target> + + <target name="build" depends="regenerate, compile" description="re-generates and compiles static and generated source after cleaning"> + </target> + + <target name="compile" depends="prepare" description="compiles static and generated source"> + <javac destdir="classes" debug="true" deprecation="true" source="1.5"> + <src path="${generated.src}"/> + <src path="${static.src}"/> + <classpath refid="project.classpath"/> + </javac> + </target> + + <target name="regenerate" depends="clean, generate" description="generates code"> + </target> + + <target name="check-generate"> + <uptodate property="generateNotRequired" targetfile="${generated.src}/results.out" srcfile="${amq.asl}"/> + </target> + + <target name="generate" depends="check-generate" unless="${generateNotRequired}" description="generates code"> + <mkdir dir="${generated.src}"/> + <java jar="${saxon.jar}" fork="true"> + <arg value="-o"/> + <arg value="${generated.src}/results.out"/> + <arg value="${amq.asl}"/> + <arg value="${stylesheet}"/> + <arg value="asl_base=${asl.base}"/> + <arg value="registry_name=MainRegistry"/> + </java> + <java jar="${saxon.jar}" fork="true"> + <arg value="-o"/> + <arg value="${generated.src}/cluster.out"/> + <arg value="${cluster.asl}"/> + <arg value="${stylesheet}"/> + <arg value="registry_name=ClusterRegistry"/> + </java> + <java jar="${saxon.jar}" fork="true"> + <arg value="-o"/> + <arg value="${generated.src}/registry.out"/> + <arg value="${registry_template}"/> + <arg value="${registry_stylesheet}"/> + </java> + </target> + + <target name="clean" depends="prepare" description="deletes any products of the compile and generate tasks"> + <delete quiet="true"> + <fileset dir="classes" includes="**/*"/> + <fileset dir="${generated.src}" includes="**/*"/> + </delete> + <mkdir dir="${generated.src}"/> + </target> +</project> diff --git a/java/common/etc/qpid-run.conf b/java/common/etc/qpid-run.conf new file mode 100644 index 0000000000..f4391c6116 --- /dev/null +++ b/java/common/etc/qpid-run.conf @@ -0,0 +1,22 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +BLAZE_LIBS=$(find ${BLAZE_HOME}/lib -name "*.jar" -print | sed -e :a -e '$!N;s/\n/:/;ta' -e 'P;D') + +export JAVA=java \ + JAVA_VM=-server \ + JAVA_MEM=-Xmx1024m \ + CLASSPATH=$BLAZE_LIBS diff --git a/java/common/etc/qpid-run.conf.dev b/java/common/etc/qpid-run.conf.dev new file mode 100644 index 0000000000..71a1ebbd95 --- /dev/null +++ b/java/common/etc/qpid-run.conf.dev @@ -0,0 +1,23 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +QPID_LIBS=$(find ${QPID_HOME} -type d -name "classes" -print | sed -e :a -e '$!N;s/\n/:/;ta' -e 'P;D') +QPID_LIBS=${QPID_LIBS}:$(find $(dirname ${QPID_HOME}) -name "*.jar" -print | sed -e :a -e '$!N;s/\n/:/;ta' -e 'P;D') + +export JAVA=java \ + JAVA_VM=-server \ + JAVA_MEM=-Xmx1024m \ + CLASSPATH=$QPID_LIBS diff --git a/java/common/lib/commons-cli/commons-cli-1.0.jar b/java/common/lib/commons-cli/commons-cli-1.0.jar Binary files differnew file mode 100644 index 0000000000..22a004e14e --- /dev/null +++ b/java/common/lib/commons-cli/commons-cli-1.0.jar diff --git a/java/common/lib/commons-collections/commons-collections-3.1.jar b/java/common/lib/commons-collections/commons-collections-3.1.jar Binary files differnew file mode 100644 index 0000000000..41e230feea --- /dev/null +++ b/java/common/lib/commons-collections/commons-collections-3.1.jar diff --git a/java/common/lib/commons-configuration/commons-configuration-1.2.jar b/java/common/lib/commons-configuration/commons-configuration-1.2.jar Binary files differnew file mode 100644 index 0000000000..574d0ac789 --- /dev/null +++ b/java/common/lib/commons-configuration/commons-configuration-1.2.jar diff --git a/java/common/lib/commons-lang/commons-lang-2.1.jar b/java/common/lib/commons-lang/commons-lang-2.1.jar Binary files differnew file mode 100644 index 0000000000..87b80ab5db --- /dev/null +++ b/java/common/lib/commons-lang/commons-lang-2.1.jar diff --git a/java/common/lib/commons-logging/commons-logging-api.jar b/java/common/lib/commons-logging/commons-logging-api.jar Binary files differnew file mode 100644 index 0000000000..ade9a13c78 --- /dev/null +++ b/java/common/lib/commons-logging/commons-logging-api.jar diff --git a/java/common/lib/commons-logging/commons-logging.jar b/java/common/lib/commons-logging/commons-logging.jar Binary files differnew file mode 100644 index 0000000000..b73a80fab6 --- /dev/null +++ b/java/common/lib/commons-logging/commons-logging.jar diff --git a/java/common/lib/junit/junit-4.0.jar b/java/common/lib/junit/junit-4.0.jar Binary files differnew file mode 100644 index 0000000000..b20406924a --- /dev/null +++ b/java/common/lib/junit/junit-4.0.jar diff --git a/java/common/lib/junit/junit.jar b/java/common/lib/junit/junit.jar Binary files differnew file mode 100644 index 0000000000..674d71e89e --- /dev/null +++ b/java/common/lib/junit/junit.jar diff --git a/java/common/lib/logging-log4j/log4j-1.2.13.jar b/java/common/lib/logging-log4j/log4j-1.2.13.jar Binary files differnew file mode 100644 index 0000000000..dde9972109 --- /dev/null +++ b/java/common/lib/logging-log4j/log4j-1.2.13.jar diff --git a/java/common/lib/mina/mina-core-0.9.5-SNAPSHOT.jar b/java/common/lib/mina/mina-core-0.9.5-SNAPSHOT.jar Binary files differnew file mode 100644 index 0000000000..6fcbb64543 --- /dev/null +++ b/java/common/lib/mina/mina-core-0.9.5-SNAPSHOT.jar diff --git a/java/common/lib/mina/mina-filter-ssl-0.9.5-SNAPSHOT.jar b/java/common/lib/mina/mina-filter-ssl-0.9.5-SNAPSHOT.jar Binary files differnew file mode 100644 index 0000000000..45e0333be1 --- /dev/null +++ b/java/common/lib/mina/mina-filter-ssl-0.9.5-SNAPSHOT.jar diff --git a/java/common/lib/saxon/saxon8.jar b/java/common/lib/saxon/saxon8.jar Binary files differnew file mode 100644 index 0000000000..197ce75c5b --- /dev/null +++ b/java/common/lib/saxon/saxon8.jar diff --git a/java/common/lib/slf4j/slf4j-simple.jar b/java/common/lib/slf4j/slf4j-simple.jar Binary files differnew file mode 100644 index 0000000000..5c6567d11c --- /dev/null +++ b/java/common/lib/slf4j/slf4j-simple.jar diff --git a/java/common/readme.txt b/java/common/readme.txt new file mode 100644 index 0000000000..12841fa08d --- /dev/null +++ b/java/common/readme.txt @@ -0,0 +1,4 @@ +AMQP Common Java API + +Common generated functionality for AMQP Java client and broker. See the +readme in the client and broker directories. diff --git a/java/common/resources/ProtocolVersionList.java b/java/common/resources/ProtocolVersionList.java new file mode 100644 index 0000000000..febf002f08 --- /dev/null +++ b/java/common/resources/ProtocolVersionList.java @@ -0,0 +1,37 @@ +/** + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** + * This class is autogenerated, do not modify. + */ + +package org.apache.qpid.framing; + +/** + * NOTE: Don't remove the line containing the token VER or VER1 - these are + * markers for code generation. + */ + +public interface ProtocolVersionList +{ + public final int PROTOCOL_MAJOR = 0; + public final int PROTOCOL_MINOR = 1; + public final byte pv[][] = { + // !VER1! + }; +} diff --git a/java/common/resources/cluster.asl b/java/common/resources/cluster.asl new file mode 100644 index 0000000000..cc5507d79d --- /dev/null +++ b/java/common/resources/cluster.asl @@ -0,0 +1,56 @@ +<?xml version="1.0"?> +<!-- + - + - Copyright (c) 2006 The Apache Software Foundation + - + - Licensed under the Apache License, Version 2.0 (the "License"); + - you may not use this file except in compliance with the License. + - You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, software + - distributed under the License is distributed on an "AS IS" BASIS, + - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + - See the License for the specific language governing permissions and + - limitations under the License. + - + --> + +<amqp major="8" minor="0" port="5672" comment="AMQ protocol 0.80"> + +<class name = "cluster" index = "101"> + +<doc> + An extension that allows brokers to communicate in order to + provide a clustered service to clients. +</doc> + +<method name = "join"> + <field name = "broker" type = "shortstr" /> +</method> + +<method name = "membership"> + <field name = "members" type = "longstr" /> +</method> + +<method name = "synch"> +</method> + +<method name = "leave"> + <field name = "broker" type = "shortstr" /> +</method> + +<method name = "suspect"> + <field name = "broker" type = "shortstr" /> +</method> + +<method name = "ping"> + <field name = "broker" type = "shortstr" /> + <field name = "load" type = "long" /> + <field name = "response required" type = "bit" /> +</method> + +</class> + +</amqp> diff --git a/java/common/resources/org/apache/qpid/ssl/qpid.cert b/java/common/resources/org/apache/qpid/ssl/qpid.cert Binary files differnew file mode 100644 index 0000000000..e6702108e6 --- /dev/null +++ b/java/common/resources/org/apache/qpid/ssl/qpid.cert diff --git a/java/common/resources/registry.template b/java/common/resources/registry.template new file mode 100644 index 0000000000..3dd0f2013c --- /dev/null +++ b/java/common/resources/registry.template @@ -0,0 +1,22 @@ +<?xml version="1.0"?> +<!-- + - + - Copyright (c) 2006 The Apache Software Foundation + - + - Licensed under the Apache License, Version 2.0 (the "License"); + - you may not use this file except in compliance with the License. + - You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, software + - distributed under the License is distributed on an "AS IS" BASIS, + - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + - See the License for the specific language governing permissions and + - limitations under the License. + - + --> +<registries> + <registry name="MainRegistry"/> + <registry name="ClusterRegistry"/> +</registries> diff --git a/java/common/src/org/apache/qpid/AMQChannelClosedException.java b/java/common/src/org/apache/qpid/AMQChannelClosedException.java new file mode 100644 index 0000000000..694fe75cda --- /dev/null +++ b/java/common/src/org/apache/qpid/AMQChannelClosedException.java @@ -0,0 +1,31 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid; + +/** + * AMQ channel closed exception. + */ +public class AMQChannelClosedException extends AMQException +{ + public AMQChannelClosedException(int errorCode, String msg) + { + super(errorCode, msg); + } +} + + diff --git a/java/common/src/org/apache/qpid/AMQChannelException.java b/java/common/src/org/apache/qpid/AMQChannelException.java new file mode 100644 index 0000000000..677a4938a0 --- /dev/null +++ b/java/common/src/org/apache/qpid/AMQChannelException.java @@ -0,0 +1,46 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid; + +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.AMQFrame; + +public class AMQChannelException extends AMQException +{ + private final int _classId; + private final int _methodId; + + public AMQChannelException(int errorCode, String msg, int classId, int methodId, Throwable t) + { + super(errorCode, msg, t); + _classId = classId; + _methodId = methodId; + } + + public AMQChannelException(int errorCode, String msg, int classId, int methodId) + { + super(errorCode, msg); + _classId = classId; + _methodId = methodId; + } + + public AMQFrame getCloseFrame(int channel) + { + return ChannelCloseBody.createAMQFrame(channel, getErrorCode(), getMessage(), _classId, _methodId); + } +} diff --git a/java/common/src/org/apache/qpid/AMQConnectionClosedException.java b/java/common/src/org/apache/qpid/AMQConnectionClosedException.java new file mode 100644 index 0000000000..dcf393eb65 --- /dev/null +++ b/java/common/src/org/apache/qpid/AMQConnectionClosedException.java @@ -0,0 +1,31 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid; + +/** + * AMQ channel closed exception. + */ +public class AMQConnectionClosedException extends AMQException +{ + public AMQConnectionClosedException(int errorCode, String msg) + { + super(errorCode, msg); + } +} + + diff --git a/java/common/src/org/apache/qpid/AMQConnectionException.java b/java/common/src/org/apache/qpid/AMQConnectionException.java new file mode 100644 index 0000000000..171af23500 --- /dev/null +++ b/java/common/src/org/apache/qpid/AMQConnectionException.java @@ -0,0 +1,27 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.qpid; + +public class AMQConnectionException extends AMQException +{ + public AMQConnectionException(String message) + { + super(message); + } +} diff --git a/java/common/src/org/apache/qpid/AMQDisconnectedException.java b/java/common/src/org/apache/qpid/AMQDisconnectedException.java new file mode 100644 index 0000000000..616a95bd1b --- /dev/null +++ b/java/common/src/org/apache/qpid/AMQDisconnectedException.java @@ -0,0 +1,31 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid; + +/** + * AMQ disconnected exception. + */ +public class AMQDisconnectedException extends AMQException +{ + public AMQDisconnectedException(String msg) + { + super(msg); + } +} + + diff --git a/java/common/src/org/apache/qpid/AMQException.java b/java/common/src/org/apache/qpid/AMQException.java new file mode 100644 index 0000000000..423cbf8975 --- /dev/null +++ b/java/common/src/org/apache/qpid/AMQException.java @@ -0,0 +1,74 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid; + +import org.apache.log4j.Logger; + +/** + * Generic AMQ exception. + */ +public class AMQException extends Exception +{ + private int _errorCode; + + public AMQException(String message) + { + super(message); + } + + public AMQException(String msg, Throwable t) + { + super(msg, t); + } + + public AMQException(int errorCode, String msg, Throwable t) + { + super(msg + " [error code " + errorCode + ']', t); + _errorCode = errorCode; + } + + public AMQException(int errorCode, String msg) + { + super(msg + " [error code " + errorCode + ']'); + _errorCode = errorCode; + } + + public AMQException(Logger logger, String msg, Throwable t) + { + this(msg, t); + logger.error(getMessage(), this); + } + + public AMQException(Logger logger, String msg) + { + this(msg); + logger.error(getMessage(), this); + } + + public AMQException(Logger logger, int errorCode, String msg) + { + this(errorCode, msg); + logger.error(getMessage(), this); + } + + public int getErrorCode() + { + return _errorCode; + } + +} diff --git a/java/common/src/org/apache/qpid/AMQUndeliveredException.java b/java/common/src/org/apache/qpid/AMQUndeliveredException.java new file mode 100644 index 0000000000..9424fe8eb1 --- /dev/null +++ b/java/common/src/org/apache/qpid/AMQUndeliveredException.java @@ -0,0 +1,41 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid; + +/** + * Generic AMQ exception. + */ +public class AMQUndeliveredException extends AMQException +{ + private Object _bounced; + + public AMQUndeliveredException(int errorCode, String msg, Object bounced) + { + super(errorCode, msg); + + _bounced = bounced; + } + + public Object getUndeliveredMessage() + { + return _bounced; + } + +} + + diff --git a/java/common/src/org/apache/qpid/AMQUnresolvedAddressException.java b/java/common/src/org/apache/qpid/AMQUnresolvedAddressException.java new file mode 100644 index 0000000000..b290e1e4c7 --- /dev/null +++ b/java/common/src/org/apache/qpid/AMQUnresolvedAddressException.java @@ -0,0 +1,26 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid; + +public class AMQUnresolvedAddressException extends AMQException +{ + public AMQUnresolvedAddressException(String message) + { + super(message); + } +} diff --git a/java/common/src/org/apache/qpid/bio/Reader.java b/java/common/src/org/apache/qpid/bio/Reader.java new file mode 100644 index 0000000000..165a323337 --- /dev/null +++ b/java/common/src/org/apache/qpid/bio/Reader.java @@ -0,0 +1,98 @@ +/* + * @(#) $Id: SocketSessionImpl.java 398039 2006-04-28 23:36:27Z proyal $ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.bio; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoHandler; + +import java.io.IOException; +import java.nio.channels.ByteChannel; +import java.nio.channels.ClosedByInterruptException; + +class Reader implements Runnable +{ + private final IoHandler handler; + private final SocketSessionImpl session; + private final ByteChannel channel; + private volatile boolean stopped; + + Reader(IoHandler handler, SocketSessionImpl session) + { + this.handler = handler; + this.session = session; + channel = session.getChannel(); + } + + void stop() + { + stopped = true; + } + + public void run() + { + while (!stopped) + { + try + { + ByteBuffer buffer = ByteBuffer.allocate(session.getReadBufferSize()); + int read = channel.read(buffer.buf()); + if(read > 0) + { + buffer.flip(); + ((SocketFilterChain) session.getFilterChain()).messageReceived(session, buffer); + } + else + { + stopped = true; + } + } + catch (ClosedByInterruptException e) + { + stopped = true; + } + catch (IOException e) + { + if (!stopped) + { + signalException(e); + session.close(); + } + } + catch (Exception e) + { + if (!stopped) + { + signalException(e); + } + } + } + } + + private void signalException(Exception e) + { + try + { + handler.exceptionCaught(session, e); + } + catch (Exception e2) + { + e.printStackTrace(); + } + } +} diff --git a/java/common/src/org/apache/qpid/bio/Sequence.java b/java/common/src/org/apache/qpid/bio/Sequence.java new file mode 100644 index 0000000000..dcaae4d6d7 --- /dev/null +++ b/java/common/src/org/apache/qpid/bio/Sequence.java @@ -0,0 +1,29 @@ +/* + * @(#) $Id: SocketSessionImpl.java 398039 2006-04-28 23:36:27Z proyal $ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.bio; + +class Sequence +{ + private int nextId = 0; + + synchronized int nextId() + { + return nextId++; + } +} diff --git a/java/common/src/org/apache/qpid/bio/SimpleSocketChannel.java b/java/common/src/org/apache/qpid/bio/SimpleSocketChannel.java new file mode 100644 index 0000000000..0495654f73 --- /dev/null +++ b/java/common/src/org/apache/qpid/bio/SimpleSocketChannel.java @@ -0,0 +1,82 @@ +/* + * @(#) $Id: SocketSessionImpl.java 398039 2006-04-28 23:36:27Z proyal $ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.bio; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; + +/** + * A simpler alternative to the non-blocking enabled SocketChannel for + * use with blocking io only. Not threadsafe. + */ +class SimpleSocketChannel implements ByteChannel +{ + private final Socket socket; + private final OutputStream out; + private final InputStream in; + private final byte[] buffer = new byte[2048]; + + SimpleSocketChannel(Socket socket) throws IOException + { + this.socket = socket; + out = socket.getOutputStream(); + in = socket.getInputStream(); + } + + Socket socket() + { + return socket; + } + + public int read(ByteBuffer dst) throws IOException + { + if (dst == null) + { + throw new NullPointerException("Null buffer passed into read"); + } + int read = in.read(buffer, 0, Math.min(buffer.length, dst.limit() - dst.position())); + if (read > 0) + { + dst.put(buffer, 0, read); + } + return read; + } + + public int write(ByteBuffer dst) throws IOException + { + byte[] data = new byte[dst.remaining()]; + dst.get(data); + out.write(data); + return data.length; + } + + public boolean isOpen() + { + return socket.isConnected(); + } + + public void close() throws IOException + { + socket.close(); + } +} diff --git a/java/common/src/org/apache/qpid/bio/SocketAcceptor.java b/java/common/src/org/apache/qpid/bio/SocketAcceptor.java new file mode 100644 index 0000000000..d47a29a047 --- /dev/null +++ b/java/common/src/org/apache/qpid/bio/SocketAcceptor.java @@ -0,0 +1,277 @@ +/* + * @(#) $Id: SocketAcceptor.java 389042 2006-03-27 07:49:41Z trustin $ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.bio; + +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.support.BaseIoAcceptor; +import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.ByteChannel; +import java.nio.channels.ServerSocketChannel; +import java.util.*; + +/** + */ +public class SocketAcceptor extends BaseIoAcceptor +{ + private static final Sequence acceptorSeq = new Sequence(); + + private final int id = acceptorSeq.nextId(); + private final String threadName = "SocketAcceptor-" + id; + private final IoServiceConfig defaultConfig = new SocketAcceptorConfig(); + private final Map services = new HashMap();//SocketAddress => SocketBinding + + public SocketAcceptor() + { + } + + /** + * Binds to the specified <code>address</code> and handles incoming connections with the specified + * <code>handler</code>. Backlog value is configured to the value of <code>backlog</code> property. + * + * @throws IOException if failed to bind + */ + public void bind(SocketAddress address, IoHandler handler, IoServiceConfig config) throws IOException + { + if (address == null) + { + throw new NullPointerException("address"); + } + + if (handler == null) + { + throw new NullPointerException("handler"); + } + + if (!(address instanceof InetSocketAddress)) + { + throw new IllegalArgumentException("Unexpected address type: " + address.getClass()); + } + + if (((InetSocketAddress) address).getPort() == 0) + { + throw new IllegalArgumentException("Unsupported port number: 0"); + } + + if (config == null) + { + config = getDefaultConfig(); + } + + SocketBinding service = new SocketBinding(address, handler, config); + synchronized (services) + { + services.put(address, service); + } + service.start(); + } + + public Set getManagedSessions(SocketAddress address) + { + if (address == null) + { + throw new NullPointerException("address"); + } + + SocketBinding service = (SocketBinding) services.get(address); + + if (service == null) + { + throw new IllegalArgumentException("Address not bound: " + address); + } + + return Collections.unmodifiableSet(new HashSet(service.sessions)); + } + + public void unbind(SocketAddress address) + { + if (address == null) + { + throw new NullPointerException("address"); + } + + SocketBinding service; + synchronized (services) + { + service = (SocketBinding) services.remove(address); + } + + if (service == null) + { + throw new IllegalArgumentException("Address not bound: " + address); + } + + try + { + service.unbind(); + } + catch (IOException e) + { + //TODO: handle properly + e.printStackTrace(); + } + } + + public void unbindAll() + { + synchronized (services) + { + for (Iterator i = services.entrySet().iterator(); i.hasNext();) + { + SocketBinding service = (SocketBinding) i.next(); + try + { + service.unbind(); + } + catch (IOException e) + { + //TODO: handle properly + e.printStackTrace(); + } + i.remove(); + } + } + } + + public boolean isBound(SocketAddress address) + { + synchronized (services) + { + return services.containsKey(address); + } + } + + public Set getBoundAddresses() + { + throw new UnsupportedOperationException("getBoundAddresses() not supported by blocking IO Acceptor"); + } + + public IoServiceConfig getDefaultConfig() + { + return defaultConfig; + } + + private class SocketBinding implements Runnable + { + private final SocketAddress address; + private final ServerSocketChannel service; + //private final ServerSocket service; + private final IoServiceConfig config; + private final IoHandler handler; + private final List sessions = new ArrayList(); + private volatile boolean stopped = false; + private Thread runner; + + SocketBinding(SocketAddress address, IoHandler handler, IoServiceConfig config) throws IOException + { + this.address = address; + this.handler = handler; + this.config = config; + + service = ServerSocketChannel.open(); + service.socket().bind(address); + + //service = new ServerSocket(); + //service.bind(address); + } + + void unbind() throws IOException + { + stopped = true; + //shutdown all sessions + for (Iterator i = sessions.iterator(); i.hasNext();) + { + ((SocketSessionImpl) i.next()).close(); + i.remove(); + } + + //close server socket + service.close(); + if (runner != null) + { + try + { + runner.join(); + } + catch (InterruptedException e) + { + //ignore and return + System.err.println("Warning: interrupted on unbind(" + address + ")"); + } + } + } + + void start() + { + runner = new Thread(this); + runner.start(); + } + + public void run() + { + while (!stopped) + { + try + { + accept(); + } + catch (Exception e) + { + //handle this better... + e.printStackTrace(); + } + } + } + + private void accept() throws Exception + { + //accept(new SimpleSocketChannel(service.accept())); + accept(service.accept()); + } + + private void accept(ByteChannel channel) throws Exception + { + //SocketChannel channel; + //start session + SocketSessionImpl session = new SocketSessionImpl(SocketAcceptor.this, + (SocketSessionConfig) defaultConfig.getSessionConfig(), + handler, + channel, + address); + //signal start etc... + sessions.add(session); + + //TODO + //need to set up filter chains somehow... (this is copied from connector...) + getFilterChainBuilder().buildFilterChain(session.getFilterChain()); + config.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); + config.getThreadModel().buildFilterChain(session.getFilterChain()); + ((SocketFilterChain) session.getFilterChain()).sessionCreated(session); + + session.start(); + //not sure if this will work... socket is already opened before the created callback is called... + ((SocketFilterChain) session.getFilterChain()).sessionOpened(session); + } + } +} diff --git a/java/common/src/org/apache/qpid/bio/SocketConnector.java b/java/common/src/org/apache/qpid/bio/SocketConnector.java new file mode 100644 index 0000000000..b107c44726 --- /dev/null +++ b/java/common/src/org/apache/qpid/bio/SocketConnector.java @@ -0,0 +1,150 @@ +/* + * @(#) $Id: SocketConnector.java 389042 2006-03-27 07:49:41Z trustin $ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.bio; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.support.BaseIoConnector; +import org.apache.mina.common.support.DefaultConnectFuture; +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.nio.channels.ByteChannel; +import java.nio.channels.SocketChannel; + +/** + */ +public class SocketConnector extends BaseIoConnector +{ + /** + * @noinspection StaticNonFinalField + */ + private static final Sequence idSequence = new Sequence(); + + private final Object lock = new Object(); + private final String threadName = "SocketConnector-" + idSequence.nextId(); + private final IoServiceConfig defaultConfig = new SocketConnectorConfig(); + private final Set managedSessions = Collections.synchronizedSet(new HashSet()); + + /** + * Create a connector with a single processing thread + */ + public SocketConnector() + { + } + + public IoServiceConfig getDefaultConfig() + { + return defaultConfig; + } + + public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config) + { + return connect(address, null, handler, config); + } + + public ConnectFuture connect(SocketAddress address, SocketAddress localAddress, + IoHandler handler, IoServiceConfig config) + { + if (address == null) + { + throw new NullPointerException("address"); + } + if (handler == null) + { + throw new NullPointerException("handler"); + } + + if (! (address instanceof InetSocketAddress)) + { + throw new IllegalArgumentException("Unexpected address type: " + address.getClass()); + } + if (localAddress != null && !(localAddress instanceof InetSocketAddress)) + { + throw new IllegalArgumentException("Unexpected local address type: " + localAddress.getClass()); + } + if (config == null) + { + config = getDefaultConfig(); + } + + DefaultConnectFuture future = new DefaultConnectFuture(); + try + { + + //Socket socket = new Socket(); + //socket.connect(address); + //SimpleSocketChannel channel = new SimpleSocketChannel(socket); + //SocketAddress serviceAddress = socket.getRemoteSocketAddress(); + + SocketChannel channel = SocketChannel.open(address); + channel.configureBlocking(true); + SocketAddress serviceAddress = channel.socket().getRemoteSocketAddress(); + + + SocketSessionImpl session = newSession(channel, handler, config, channel.socket().getRemoteSocketAddress()); + future.setSession(session); + } + catch (IOException e) + { + future.setException(e); + } + + return future; + } + + private SocketSessionImpl newSession(ByteChannel channel, IoHandler handler, IoServiceConfig config, SocketAddress serviceAddress) + throws IOException + { + SocketSessionImpl session = new SocketSessionImpl(this, + (SocketSessionConfig) config.getSessionConfig(), + handler, + channel, + serviceAddress); + try + { + getFilterChainBuilder().buildFilterChain(session.getFilterChain()); + config.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); + config.getThreadModel().buildFilterChain(session.getFilterChain()); + ((SocketFilterChain) session.getFilterChain()).sessionCreated(session); + + session.start(); + //not sure if this will work... socket is already opened before the created callback is called... + ((SocketFilterChain) session.getFilterChain()).sessionOpened(session); + } + catch (Throwable e) + { + throw (IOException) new IOException("Failed to create a session.").initCause(e); + } + + //TODO: figure out how the managed session are used/ what they are etc. + //session.getManagedSessions().add( session ); + + + return session; + } +} diff --git a/java/common/src/org/apache/qpid/bio/SocketFilterChain.java b/java/common/src/org/apache/qpid/bio/SocketFilterChain.java new file mode 100644 index 0000000000..f00a1535aa --- /dev/null +++ b/java/common/src/org/apache/qpid/bio/SocketFilterChain.java @@ -0,0 +1,62 @@ +/* + * @(#) $Id: SocketFilterChain.java 398039 2006-04-28 23:36:27Z proyal $ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.bio; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.support.AbstractIoFilterChain; + +import java.io.IOException; +import java.nio.channels.ClosedByInterruptException; + +/** + */ +class SocketFilterChain extends AbstractIoFilterChain +{ + + SocketFilterChain(IoSession parent) + { + super(parent); + } + + protected void doWrite(IoSession session, WriteRequest writeRequest) throws Exception + { + SocketSessionImpl s = (SocketSessionImpl) session; + + //write to socket + try + { + s.getChannel().write(((ByteBuffer) writeRequest.getMessage()).buf()); + + //notify of completion + writeRequest.getFuture().setWritten(true); + } + catch(ClosedByInterruptException e) + { + writeRequest.getFuture().setWritten(false); + } + } + + protected void doClose(IoSession session) throws IOException + { + SocketSessionImpl s = (SocketSessionImpl) session; + s.shutdown(); + } +} diff --git a/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java b/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java new file mode 100644 index 0000000000..d534093533 --- /dev/null +++ b/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java @@ -0,0 +1,421 @@ +/* + * @(#) $Id: SocketSessionImpl.java 398039 2006-04-28 23:36:27Z proyal $ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.bio; + +import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoService; +import org.apache.mina.common.IoSessionConfig; +import org.apache.mina.common.RuntimeIOException; +import org.apache.mina.common.TransportType; +import org.apache.mina.common.support.BaseIoSession; +import org.apache.mina.common.support.BaseIoSessionConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfigImpl; + +import java.io.IOException; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.channels.ByteChannel; +import java.nio.channels.SocketChannel; + +/** + */ +class SocketSessionImpl extends BaseIoSession +{ + private final IoService manager; + private final SocketSessionConfig config; + private final SocketFilterChain filterChain; + private final IoHandler handler; + private final SocketAddress remoteAddress; + private final SocketAddress localAddress; + private final SocketAddress serviceAddress; + private final Socket socket; + private final ByteChannel channel; + private final Reader reader; + private Thread runner; + private int readBufferSize; + + /** + * Creates a new instance. + */ + SocketSessionImpl(IoService manager, + SocketSessionConfig config, + IoHandler handler, + ByteChannel channel, + SocketAddress serviceAddress) throws IOException + { + this.manager = manager; + this.filterChain = new SocketFilterChain(this); + this.handler = handler; + this.channel = channel; + if(channel instanceof SocketChannel) + { + socket = ((SocketChannel) channel).socket(); + } + else if(channel instanceof SimpleSocketChannel) + { + socket = ((SimpleSocketChannel) channel).socket(); + } + else + { + throw new IllegalArgumentException("Unrecognised channel type: " + channel.getClass()); + } + + this.remoteAddress = socket.getRemoteSocketAddress(); + this.localAddress = socket.getLocalSocketAddress(); + this.serviceAddress = serviceAddress; + + this.config = new SessionConfigImpl(config); + + reader = new Reader(handler, this); + } + + void start() + { + //create & start thread for this... + runner = new Thread(reader); + runner.start(); + } + + void shutdown() throws IOException + { + filterChain.sessionClosed( this ); + reader.stop(); + channel.close(); + } + + ByteChannel getChannel() + { + return channel; + } + + protected void write0(WriteRequest writeRequest) + { + filterChain.filterWrite(this, writeRequest); + } + + protected void close0() + { + filterChain.filterClose(this); + super.close0(); + } + + protected void updateTrafficMask() + { + //TODO + } + + public IoService getService() + { + return manager; + } + + public IoSessionConfig getConfig() + { + return config; + } + + public IoFilterChain getFilterChain() + { + return filterChain; + } + + public IoHandler getHandler() + { + return handler; + } + + public int getScheduledWriteRequests() + { + return 0; + } + + public int getScheduledWriteBytes() + { + return 0; + } + + public TransportType getTransportType() + { + return TransportType.SOCKET; + } + + public SocketAddress getRemoteAddress() + { + return remoteAddress; + } + + public SocketAddress getLocalAddress() + { + return localAddress; + } + + public SocketAddress getServiceAddress() + { + return serviceAddress; + } + + int getReadBufferSize() + { + return readBufferSize; + } + + private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig + { + SessionConfigImpl() + { + } + + SessionConfigImpl(SocketSessionConfig cfg) + { + setKeepAlive(cfg.isKeepAlive()); + setOobInline(cfg.isOobInline()); + setReceiveBufferSize(cfg.getReceiveBufferSize()); + readBufferSize = cfg.getReceiveBufferSize(); + setReuseAddress(cfg.isReuseAddress()); + setSendBufferSize(cfg.getSendBufferSize()); + setSoLinger(cfg.getSoLinger()); + setTcpNoDelay(cfg.isTcpNoDelay()); + if (getTrafficClass() != cfg.getTrafficClass()) + { + setTrafficClass(cfg.getTrafficClass()); + } + } + + + public boolean isKeepAlive() + { + try + { + return socket.getKeepAlive(); + } + catch (SocketException e) + { + throw new RuntimeIOException(e); + } + } + + public void setKeepAlive(boolean on) + { + try + { + socket.setKeepAlive(on); + } + catch (SocketException e) + { + throw new RuntimeIOException(e); + } + } + + public boolean isOobInline() + { + try + { + return socket.getOOBInline(); + } + catch (SocketException e) + { + throw new RuntimeIOException(e); + } + } + + public void setOobInline(boolean on) + { + try + { + socket.setOOBInline(on); + } + catch (SocketException e) + { + throw new RuntimeIOException(e); + } + } + + public boolean isReuseAddress() + { + try + { + return socket.getReuseAddress(); + } + catch (SocketException e) + { + throw new RuntimeIOException(e); + } + } + + public void setReuseAddress(boolean on) + { + try + { + socket.setReuseAddress(on); + } + catch (SocketException e) + { + throw new RuntimeIOException(e); + } + } + + public int getSoLinger() + { + try + { + return socket.getSoLinger(); + } + catch (SocketException e) + { + throw new RuntimeIOException(e); + } + } + + public void setSoLinger(int linger) + { + try + { + if (linger < 0) + { + socket.setSoLinger(false, 0); + } + else + { + socket.setSoLinger(true, linger); + } + } + catch (SocketException e) + { + throw new RuntimeIOException(e); + } + } + + public boolean isTcpNoDelay() + { + try + { + return socket.getTcpNoDelay(); + } + catch (SocketException e) + { + throw new RuntimeIOException(e); + } + } + + public void setTcpNoDelay(boolean on) + { + try + { + socket.setTcpNoDelay(on); + } + catch (SocketException e) + { + throw new RuntimeIOException(e); + } + } + + public int getTrafficClass() + { + if (SocketSessionConfigImpl.isGetTrafficClassAvailable()) + { + try + { + return socket.getTrafficClass(); + } + catch (SocketException e) + { + throw new RuntimeIOException(e); + } + } + else + { + return 0; + } + } + + public void setTrafficClass(int tc) + { + if (SocketSessionConfigImpl.isSetTrafficClassAvailable()) + { + try + { + socket.setTrafficClass(tc); + } + catch (SocketException e) + { + throw new RuntimeIOException(e); + } + } + } + + public int getSendBufferSize() + { + try + { + return socket.getSendBufferSize(); + } + catch (SocketException e) + { + throw new RuntimeIOException(e); + } + } + + public void setSendBufferSize(int size) + { + if (SocketSessionConfigImpl.isSetSendBufferSizeAvailable()) + { + try + { + socket.setSendBufferSize(size); + } + catch (SocketException e) + { + throw new RuntimeIOException(e); + } + } + } + + public int getReceiveBufferSize() + { + try + { + return socket.getReceiveBufferSize(); + } + catch (SocketException e) + { + throw new RuntimeIOException(e); + } + } + + public void setReceiveBufferSize(int size) + { + if (SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable()) + { + try + { + socket.setReceiveBufferSize(size); + SocketSessionImpl.this.readBufferSize = size; + } + catch (SocketException e) + { + throw new RuntimeIOException(e); + } + } + } + } +} diff --git a/java/common/src/org/apache/qpid/codec/AMQCodecFactory.java b/java/common/src/org/apache/qpid/codec/AMQCodecFactory.java new file mode 100644 index 0000000000..c62befce6b --- /dev/null +++ b/java/common/src/org/apache/qpid/codec/AMQCodecFactory.java @@ -0,0 +1,50 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.codec; + +import org.apache.mina.filter.codec.ProtocolCodecFactory; +import org.apache.mina.filter.codec.ProtocolDecoder; +import org.apache.mina.filter.codec.ProtocolEncoder; + +public class AMQCodecFactory implements ProtocolCodecFactory +{ + private AMQEncoder _encoder = new AMQEncoder(); + + private AMQDecoder _frameDecoder; + + /** + * @param expectProtocolInitiation true if the first frame received is going to be + * a protocol initiation frame, false if it is going to be a standard AMQ data block. + * The former case is used for the broker, which always expects to received the + * protocol initiation first from a newly connected client. + */ + public AMQCodecFactory(boolean expectProtocolInitiation) + { + _frameDecoder = new AMQDecoder(expectProtocolInitiation); + } + + public ProtocolEncoder getEncoder() + { + return _encoder; + } + + public ProtocolDecoder getDecoder() + { + return _frameDecoder; + } +} diff --git a/java/common/src/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/org/apache/qpid/codec/AMQDecoder.java new file mode 100644 index 0000000000..594ae11233 --- /dev/null +++ b/java/common/src/org/apache/qpid/codec/AMQDecoder.java @@ -0,0 +1,96 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.codec; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.codec.CumulativeProtocolDecoder; +import org.apache.mina.filter.codec.ProtocolDecoderOutput; +import org.apache.qpid.framing.AMQDataBlockDecoder; +import org.apache.qpid.framing.ProtocolInitiation; + +/** + * There is one instance of this class per session. Any changes or configuration done + * at run time to the encoders or decoders only affects decoding/encoding of the + * protocol session data to which is it bound. + * + */ +public class AMQDecoder extends CumulativeProtocolDecoder +{ + private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder(); + + private ProtocolInitiation.Decoder _piDecoder = new ProtocolInitiation.Decoder(); + + private boolean _expectProtocolInitiation; + + public AMQDecoder(boolean expectProtocolInitiation) + { + _expectProtocolInitiation = expectProtocolInitiation; + } + + protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception + { + if (_expectProtocolInitiation) + { + return doDecodePI(session, in, out); + } + else + { + return doDecodeDataBlock(session, in, out); + } + } + + protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception + { + int pos = in.position(); + boolean enoughData = _dataBlockDecoder.decodable(session, in); + in.position(pos); + if (!enoughData) + { + // returning false means it will leave the contents in the buffer and + // call us again when more data has been read + return false; + } + else + { + _dataBlockDecoder.decode(session, in, out); + return true; + } + } + + private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception + { + boolean enoughData = _piDecoder.decodable(session, in); + if (!enoughData) + { + // returning false means it will leave the contents in the buffer and + // call us again when more data has been read + return false; + } + else + { + _piDecoder.decode(session, in, out); + return true; + } + } + + public void setExpectProtocolInitiation(boolean expectProtocolInitiation) + { + _expectProtocolInitiation = expectProtocolInitiation; + } +} diff --git a/java/common/src/org/apache/qpid/codec/AMQEncoder.java b/java/common/src/org/apache/qpid/codec/AMQEncoder.java new file mode 100644 index 0000000000..7d5e8182a6 --- /dev/null +++ b/java/common/src/org/apache/qpid/codec/AMQEncoder.java @@ -0,0 +1,38 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.codec; + +import org.apache.mina.filter.codec.ProtocolEncoder; +import org.apache.mina.filter.codec.ProtocolEncoderOutput; +import org.apache.mina.common.IoSession; +import org.apache.qpid.framing.AMQDataBlockEncoder; + +public class AMQEncoder implements ProtocolEncoder +{ + private AMQDataBlockEncoder _dataBlockEncoder = new AMQDataBlockEncoder(); + + public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception + { + _dataBlockEncoder.encode(session, message, out); + } + + public void dispose(IoSession session) throws Exception + { + + } +} diff --git a/java/common/src/org/apache/qpid/configuration/Configured.java b/java/common/src/org/apache/qpid/configuration/Configured.java new file mode 100644 index 0000000000..cb5e8aff1d --- /dev/null +++ b/java/common/src/org/apache/qpid/configuration/Configured.java @@ -0,0 +1,41 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.configuration; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.ElementType; +import java.lang.annotation.Target; + +/** + * Marks a field as being "configured" externally. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.FIELD) +public @interface Configured +{ + /** + * The Commons Configuration path to the configuration element + */ + String path(); + + /** + * The default value to use should the path not be found in the configuration source + */ + String defaultValue(); +} diff --git a/java/common/src/org/apache/qpid/configuration/PropertyException.java b/java/common/src/org/apache/qpid/configuration/PropertyException.java new file mode 100644 index 0000000000..f148ffc0b7 --- /dev/null +++ b/java/common/src/org/apache/qpid/configuration/PropertyException.java @@ -0,0 +1,62 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.configuration; + +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; + +/** + * Indicates an error parsing a property expansion. + */ +public class PropertyException extends AMQException +{ + public PropertyException(String message) + { + super(message); + } + + public PropertyException(String msg, Throwable t) + { + super(msg, t); + } + + public PropertyException(int errorCode, String msg, Throwable t) + { + super(errorCode, msg, t); + } + + public PropertyException(int errorCode, String msg) + { + super(errorCode, msg); + } + + public PropertyException(Logger logger, String msg, Throwable t) + { + super(logger, msg, t); + } + + public PropertyException(Logger logger, String msg) + { + super(logger, msg); + } + + public PropertyException(Logger logger, int errorCode, String msg) + { + super(logger, errorCode, msg); + } +} diff --git a/java/common/src/org/apache/qpid/configuration/PropertyUtils.java b/java/common/src/org/apache/qpid/configuration/PropertyUtils.java new file mode 100644 index 0000000000..bd4000d3c4 --- /dev/null +++ b/java/common/src/org/apache/qpid/configuration/PropertyUtils.java @@ -0,0 +1,153 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.configuration; + +import java.util.ArrayList; +import java.util.Iterator; + +/** + * Based on code in Apache Ant, this utility class handles property expansion. This + * is most useful in config files and so on. + */ +public class PropertyUtils +{ + /** + * Replaces <code>${xxx}</code> style constructions in the given value + * with the string value of the corresponding data types. Replaces only system + * properties + * + * @param value The string to be scanned for property references. + * May be <code>null</code>, in which case this + * method returns immediately with no effect. + * @return the original string with the properties replaced, or + * <code>null</code> if the original string is <code>null</code>. + * @throws PropertyException if the string contains an opening + * <code>${</code> without a closing + * <code>}</code> + */ + public static String replaceProperties(String value) throws PropertyException + { + if (value == null) + { + return null; + } + + ArrayList<String> fragments = new ArrayList<String>(); + ArrayList<String> propertyRefs = new ArrayList<String>(); + parsePropertyString(value, fragments, propertyRefs); + + StringBuffer sb = new StringBuffer(); + Iterator j = propertyRefs.iterator(); + + for (String fragment : fragments) + { + if (fragment == null) + { + String propertyName = (String) j.next(); + + // try to get it from the project or keys + // Backward compatibility + String replacement = System.getProperty(propertyName); + + if (replacement == null) + { + throw new PropertyException("Property ${" + propertyName + + "} has not been set"); + } + fragment = replacement; + } + sb.append(fragment); + } + + return sb.toString(); + } + + /** + * Default parsing method. Parses the supplied value for properties which are specified + * using ${foo} syntax. $X is left as is, and $$ specifies a single $. + * @param value the property string to parse + * @param fragments is populated with the string fragments. A null means "insert a + * property value here. The number of nulls in the list when populated is equal to the + * size of the propertyRefs list + * @param propertyRefs populated with the property names to be added into the final + * String. + */ + private static void parsePropertyString(String value, ArrayList<String> fragments, + ArrayList<String> propertyRefs) + throws PropertyException + { + int prev = 0; + int pos; + //search for the next instance of $ from the 'prev' position + while ((pos = value.indexOf("$", prev)) >= 0) + { + + //if there was any text before this, add it as a fragment + if (pos > 0) + { + fragments.add(value.substring(prev, pos)); + } + //if we are at the end of the string, we tack on a $ + //then move past it + if (pos == (value.length() - 1)) + { + fragments.add("$"); + prev = pos + 1; + } + else if (value.charAt(pos + 1) != '{') + { + //peek ahead to see if the next char is a property or not + //not a property: insert the char as a literal + if (value.charAt(pos + 1) == '$') + { + // two $ map to one $ + fragments.add("$"); + prev = pos + 2; + } + else + { + // $X maps to $X for all values of X!='$' + fragments.add(value.substring(pos, pos + 2)); + prev = pos + 2; + } + } + else + { + // property found, extract its name or bail on a typo + int endName = value.indexOf('}', pos); + if (endName < 0) + { + throw new PropertyException("Syntax error in property: " + + value); + } + String propertyName = value.substring(pos + 2, endName); + fragments.add(null); + propertyRefs.add(propertyName); + prev = endName + 1; + } + } + //no more $ signs found + //if there is any tail to the file, append it + if (prev < value.length()) + { + fragments.add(value.substring(prev)); + } + } + + +} diff --git a/java/common/src/org/apache/qpid/exchange/ExchangeDefaults.java b/java/common/src/org/apache/qpid/exchange/ExchangeDefaults.java new file mode 100644 index 0000000000..84a5836ff7 --- /dev/null +++ b/java/common/src/org/apache/qpid/exchange/ExchangeDefaults.java @@ -0,0 +1,33 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.exchange; + +public class ExchangeDefaults +{ + public final static String TOPIC_EXCHANGE_NAME = "amq.topic"; + + public final static String TOPIC_EXCHANGE_CLASS = "topic"; + + public final static String DIRECT_EXCHANGE_NAME = "amq.direct"; + + public final static String DIRECT_EXCHANGE_CLASS = "direct"; + + public final static String HEADERS_EXCHANGE_NAME = "amq.match"; + + public final static String HEADERS_EXCHANGE_CLASS = "headers"; +} diff --git a/java/common/src/org/apache/qpid/framing/AMQBody.java b/java/common/src/org/apache/qpid/framing/AMQBody.java new file mode 100644 index 0000000000..fad0450960 --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/AMQBody.java @@ -0,0 +1,35 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public abstract class AMQBody +{ + protected abstract byte getType(); + + /** + * Get the size of the body + * @return unsigned short + */ + protected abstract int getSize(); + + protected abstract void writePayload(ByteBuffer buffer); + + protected abstract void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException; +} diff --git a/java/common/src/org/apache/qpid/framing/AMQDataBlock.java b/java/common/src/org/apache/qpid/framing/AMQDataBlock.java new file mode 100644 index 0000000000..797df391c3 --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/AMQDataBlock.java @@ -0,0 +1,40 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +/** + * A data block represents something that has a size in bytes and the ability to write itself to a byte + * buffer (similar to a byte array). + */ +public abstract class AMQDataBlock implements EncodableAMQDataBlock +{ + /** + * Get the size of buffer needed to store the byte representation of this + * frame. + * @return unsigned integer + */ + public abstract long getSize(); + + /** + * Writes the datablock to the specified buffer. + * @param buffer + */ + public abstract void writePayload(ByteBuffer buffer); +} diff --git a/java/common/src/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/org/apache/qpid/framing/AMQDataBlockDecoder.java new file mode 100644 index 0000000000..3379cc18e9 --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -0,0 +1,113 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.codec.ProtocolDecoderOutput; + +import java.util.HashMap; +import java.util.Map; + +public class AMQDataBlockDecoder +{ + Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class); + + private final Map _supportedBodies = new HashMap(); + + public AMQDataBlockDecoder() + { + _supportedBodies.put(new Byte(AMQMethodBody.TYPE), AMQMethodBodyFactory.getInstance()); + _supportedBodies.put(new Byte(ContentHeaderBody.TYPE), ContentHeaderBodyFactory.getInstance()); + _supportedBodies.put(new Byte(ContentBody.TYPE), ContentBodyFactory.getInstance()); + _supportedBodies.put(new Byte(HeartbeatBody.TYPE), new HeartbeatBodyFactory()); + } + + public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException + { + // type, channel, body size and end byte + if (in.remaining() < (1 + 2 + 4 + 1)) + { + return false; + } + + final byte type = in.get(); + final int channel = in.getUnsignedShort(); + final long bodySize = in.getUnsignedInt(); + + // bodySize can be zero + if (type <= 0 || channel < 0 || bodySize < 0) + { + throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize); + } + + if (in.remaining() < (bodySize + 1)) + { + return false; + } + return true; + } + + private boolean isSupportedFrameType(byte frameType) + { + final boolean result = _supportedBodies.containsKey(new Byte(frameType)); + + if (!result) + { + _logger.warn("AMQDataBlockDecoder does not handle frame type " + frameType); + } + + return result; + } + + protected Object createAndPopulateFrame(ByteBuffer in) + throws AMQFrameDecodingException + { + final byte type = in.get(); + if (!isSupportedFrameType(type)) + { + throw new AMQFrameDecodingException("Unsupported frame type: " + type); + } + final int channel = in.getUnsignedShort(); + final long bodySize = in.getUnsignedInt(); + + BodyFactory bodyFactory = (BodyFactory) _supportedBodies.get(new Byte(type)); + if (bodyFactory == null) + { + throw new AMQFrameDecodingException("Unsupported body type: " + type); + } + AMQFrame frame = new AMQFrame(); + + frame.populateFromBuffer(in, channel, bodySize, bodyFactory); + + byte marker = in.get(); + if ((marker & 0xFF) != 0xCE) + { + throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " size=" + bodySize + " type=" + type); + } + return frame; + } + + public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) + throws Exception + { + out.write(createAndPopulateFrame(in)); + } +} diff --git a/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java b/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java new file mode 100644 index 0000000000..b0430afac9 --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java @@ -0,0 +1,62 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.codec.demux.MessageEncoder; +import org.apache.mina.filter.codec.ProtocolEncoderOutput; + +import java.util.HashSet; +import java.util.Set; + +public class AMQDataBlockEncoder implements MessageEncoder +{ + Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class); + + private Set _messageTypes; + + public AMQDataBlockEncoder() + { + _messageTypes = new HashSet(); + _messageTypes.add(EncodableAMQDataBlock.class); + } + + public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception + { + final AMQDataBlock frame = (AMQDataBlock) message; + int frameSize = (int)frame.getSize(); + final ByteBuffer buffer = ByteBuffer.allocate(frameSize); + //buffer.setAutoExpand(true); + frame.writePayload(buffer); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'"); + } + + buffer.flip(); + out.write(buffer); + } + + public Set getMessageTypes() + { + return _messageTypes; + } +} diff --git a/java/common/src/org/apache/qpid/framing/AMQFrame.java b/java/common/src/org/apache/qpid/framing/AMQFrame.java new file mode 100644 index 0000000000..17f635c06a --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/AMQFrame.java @@ -0,0 +1,73 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock +{ + public int channel; + + public AMQBody bodyFrame; + + public AMQFrame() + { + } + + public AMQFrame(int channel, AMQBody bodyFrame) + { + this.channel = channel; + this.bodyFrame = bodyFrame; + } + + public long getSize() + { + return 1 + 2 + 4 + bodyFrame.getSize() + 1; + } + + public void writePayload(ByteBuffer buffer) + { + buffer.put(bodyFrame.getType()); + // TODO: how does channel get populated + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, bodyFrame.getSize()); + bodyFrame.writePayload(buffer); + buffer.put((byte) 0xCE); + } + + /** + * + * @param buffer + * @param channel unsigned short + * @param bodySize unsigned integer + * @param bodyFactory + * @throws AMQFrameDecodingException + */ + public void populateFromBuffer(ByteBuffer buffer, int channel, long bodySize, BodyFactory bodyFactory) + throws AMQFrameDecodingException + { + this.channel = channel; + bodyFrame = bodyFactory.createBody(buffer); + bodyFrame.populateFromBuffer(buffer, bodySize); + } + + public String toString() + { + return "Frame channelId: " + channel + ", bodyFrame: " + String.valueOf(bodyFrame); + } +} diff --git a/java/common/src/org/apache/qpid/framing/AMQFrameDecodingException.java b/java/common/src/org/apache/qpid/framing/AMQFrameDecodingException.java new file mode 100644 index 0000000000..4e8a8c62b1 --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/AMQFrameDecodingException.java @@ -0,0 +1,45 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; + +public class AMQFrameDecodingException extends AMQException +{ + public AMQFrameDecodingException(String message) + { + super(message); + } + + public AMQFrameDecodingException(String message, Throwable t) + { + super(message, t); + } + + public AMQFrameDecodingException(Logger log, String message) + { + super(log, message); + } + + public AMQFrameDecodingException(Logger log, String message, Throwable t) + { + super(log, message, t); + } + +} diff --git a/java/common/src/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/org/apache/qpid/framing/AMQMethodBody.java new file mode 100644 index 0000000000..841295e538 --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/AMQMethodBody.java @@ -0,0 +1,87 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQChannelException; + +public abstract class AMQMethodBody extends AMQBody +{ + public static final byte TYPE = 1; + + /** unsigned short */ + protected abstract int getBodySize(); + + /** + * @return unsigned short + */ + protected abstract int getClazz(); + + /** + * @return unsigned short + */ + protected abstract int getMethod(); + + protected abstract void writeMethodPayload(ByteBuffer buffer); + + protected byte getType() + { + return TYPE; + } + + protected int getSize() + { + return 2 + 2 + getBodySize(); + } + + protected void writePayload(ByteBuffer buffer) + { + EncodingUtils.writeUnsignedShort(buffer, getClazz()); + EncodingUtils.writeUnsignedShort(buffer, getMethod()); + writeMethodPayload(buffer); + } + + protected abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException; + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + populateMethodBodyFromBuffer(buffer); + } + + public String toString() + { + StringBuffer buf = new StringBuffer(getClass().toString()); + buf.append(" Class: ").append(getClazz()); + buf.append(" Method: ").append(getMethod()); + return buf.toString(); + } + + /** + * Creates an AMQChannelException for the corresponding body type (a channel exception + * should include the class and method ids of the body it resulted from). + */ + public AMQChannelException getChannelException(int code, String message) + { + return new AMQChannelException(code, message, getClazz(), getMethod()); + } + + public AMQChannelException getChannelException(int code, String message, Throwable cause) + { + return new AMQChannelException(code, message, getClazz(), getMethod(), cause); + } +} diff --git a/java/common/src/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/org/apache/qpid/framing/AMQMethodBodyFactory.java new file mode 100644 index 0000000000..97f594061e --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -0,0 +1,43 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +public class AMQMethodBodyFactory implements BodyFactory +{ + private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class); + + private static final AMQMethodBodyFactory _instance = new AMQMethodBodyFactory(); + + public static AMQMethodBodyFactory getInstance() + { + return _instance; + } + + private AMQMethodBodyFactory() + { + _log.debug("Creating method body factory"); + } + + public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + { + return MethodBodyDecoderRegistry.get(in.getUnsignedShort(), in.getUnsignedShort()); + } +} diff --git a/java/common/src/org/apache/qpid/framing/AMQProtocolClassException.java b/java/common/src/org/apache/qpid/framing/AMQProtocolClassException.java new file mode 100644 index 0000000000..8a8175bbc8 --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/AMQProtocolClassException.java @@ -0,0 +1,26 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +public class AMQProtocolClassException extends AMQProtocolHeaderException +{ + public AMQProtocolClassException(String message) + { + super(message); + } +}
\ No newline at end of file diff --git a/java/common/src/org/apache/qpid/framing/AMQProtocolHeaderException.java b/java/common/src/org/apache/qpid/framing/AMQProtocolHeaderException.java new file mode 100644 index 0000000000..6809c3d21e --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/AMQProtocolHeaderException.java @@ -0,0 +1,28 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.qpid.AMQException; + +public class AMQProtocolHeaderException extends AMQException +{ + public AMQProtocolHeaderException(String message) + { + super(message); + } +} diff --git a/java/common/src/org/apache/qpid/framing/AMQProtocolInstanceException.java b/java/common/src/org/apache/qpid/framing/AMQProtocolInstanceException.java new file mode 100644 index 0000000000..7f5b26010d --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/AMQProtocolInstanceException.java @@ -0,0 +1,26 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +public class AMQProtocolInstanceException extends AMQProtocolHeaderException +{ + public AMQProtocolInstanceException(String message) + { + super(message); + } +} diff --git a/java/common/src/org/apache/qpid/framing/AMQProtocolVersionException.java b/java/common/src/org/apache/qpid/framing/AMQProtocolVersionException.java new file mode 100644 index 0000000000..4f5677b41a --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/AMQProtocolVersionException.java @@ -0,0 +1,30 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +/** + * Exception that is thrown when the client and server differ on expected protocol version (header) information. + * + */ +public class AMQProtocolVersionException extends AMQProtocolHeaderException +{ + public AMQProtocolVersionException(String message) + { + super(message); + } +} diff --git a/java/common/src/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/org/apache/qpid/framing/BasicContentHeaderProperties.java new file mode 100644 index 0000000000..20eafa0be7 --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -0,0 +1,592 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +public class BasicContentHeaderProperties implements ContentHeaderProperties +{ + private static final Logger _logger = Logger.getLogger(BasicContentHeaderProperties.class); + + /** + * We store the encoded form when we decode the content header so that if we need to + * write it out without modifying it we can do so without incurring the expense of + * reencoding it + */ + private byte[] _encodedForm; + + /** + * Flag indicating whether the entire content header has been decoded yet + */ + private boolean _decoded = true; + + /** + * We have some optimisations for partial decoding for maximum performance. The headers are used in the broker + * for routing in some cases so we can decode that separately. + */ + private boolean _decodedHeaders = true; + + /** + * We have some optimisations for partial decoding for maximum performance. The content type is used by all + * clients to determine the message type + */ + private boolean _decodedContentType = true; + + private String _contentType; + + private String _encoding; + + private FieldTable _headers; + + private byte _deliveryMode; + + private byte _priority; + + private String _correlationId; + + private String _replyTo; + + private long _expiration; + + private String _messageId; + + private long _timestamp; + + private String _type; + + private String _userId; + + private String _appId; + + private String _clusterId; + + private int _propertyFlags = 0; + + public BasicContentHeaderProperties() + { + } + + public int getPropertyListSize() + { + if (_encodedForm != null) + { + return _encodedForm.length; + } + else + { + int size = 0; + + if ((_propertyFlags & (1 << 15)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_contentType); + } + if ((_propertyFlags & (1 << 14)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_encoding); + } + if ((_propertyFlags & (1 << 13)) > 0) + { + size += EncodingUtils.encodedFieldTableLength(_headers); + } + if ((_propertyFlags & (1 << 12)) > 0) + { + size += 1; + } + if ((_propertyFlags & (1 << 11)) > 0) + { + size += 1; + } + if ((_propertyFlags & (1 << 10)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_correlationId); + } + if ((_propertyFlags & (1 << 9)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_replyTo); + } + if ((_propertyFlags & (1 << 8)) > 0) + { + size += EncodingUtils.encodedShortStringLength(String.valueOf(_expiration)); + } + if ((_propertyFlags & (1 << 7)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_messageId); + } + if ((_propertyFlags & (1 << 6)) > 0) + { + size += 8; + } + if ((_propertyFlags & (1 << 5)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_type); + } + if ((_propertyFlags & (1 << 4)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_userId); + } + if ((_propertyFlags & (1 << 3)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_appId); + } + if ((_propertyFlags & (1 << 2)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_clusterId); + } + return size; + } + } + + private void clearEncodedForm() + { + if (!_decoded && _encodedForm != null) + { + //decode(); + } + _encodedForm = null; + } + + public void setPropertyFlags(int propertyFlags) + { + clearEncodedForm(); + _propertyFlags = propertyFlags; + } + + public int getPropertyFlags() + { + return _propertyFlags; + } + + public void writePropertyListPayload(ByteBuffer buffer) + { + if (_encodedForm != null) + { + buffer.put(_encodedForm); + } + else + { + if ((_propertyFlags & (1 << 15)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _contentType); + } + if ((_propertyFlags & (1 << 14)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _encoding); + } + if ((_propertyFlags & (1 << 13)) > 0) + { + EncodingUtils.writeFieldTableBytes(buffer, _headers); + } + if ((_propertyFlags & (1 << 12)) > 0) + { + buffer.put(_deliveryMode); + } + if ((_propertyFlags & (1 << 11)) > 0) + { + buffer.put(_priority); + } + if ((_propertyFlags & (1 << 10)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _correlationId); + } + if ((_propertyFlags & (1 << 9)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _replyTo); + } + if ((_propertyFlags & (1 << 8)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration)); + } + if ((_propertyFlags & (1 << 7)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _messageId); + } + if ((_propertyFlags & (1 << 6)) > 0) + { + EncodingUtils.writeUnsignedInteger(buffer, 0/*timestamp msb*/); + EncodingUtils.writeUnsignedInteger(buffer, _timestamp); + } + if ((_propertyFlags & (1 << 5)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _type); + } + if ((_propertyFlags & (1 << 4)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _userId); + } + if ((_propertyFlags & (1 << 3)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _appId); + } + if ((_propertyFlags & (1 << 2)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _clusterId); + } + } + } + + public void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) + throws AMQFrameDecodingException + { + _propertyFlags = propertyFlags; + + if (_logger.isDebugEnabled()) + { + _logger.debug("Property flags: " + _propertyFlags); + } + decode(buffer); + /*_encodedForm = new byte[size]; + buffer.get(_encodedForm, 0, size); + _decoded = false; + _decodedHeaders = false; + _decodedContentType = false;*/ + } + + private void decode(ByteBuffer buffer) + { + //ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); + int pos = buffer.position(); + try + { + if ((_propertyFlags & (1 << 15)) > 0) + { + _contentType = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 14)) > 0) + { + _encoding = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 13)) > 0) + { + _headers = EncodingUtils.readFieldTable(buffer); + } + if ((_propertyFlags & (1 << 12)) > 0) + { + _deliveryMode = buffer.get(); + } + if ((_propertyFlags & (1 << 11)) > 0) + { + _priority = buffer.get(); + } + if ((_propertyFlags & (1 << 10)) > 0) + { + _correlationId = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 9)) > 0) + { + _replyTo = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 8)) > 0) + { + _expiration = Long.parseLong(EncodingUtils.readShortString(buffer)); + } + if ((_propertyFlags & (1 << 7)) > 0) + { + _messageId = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 6)) > 0) + { + // Discard msb from AMQ timestamp + buffer.getUnsignedInt(); + _timestamp = buffer.getUnsignedInt(); + } + if ((_propertyFlags & (1 << 5)) > 0) + { + _type = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 4)) > 0) + { + _userId = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 3)) > 0) + { + _appId = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 2)) > 0) + { + _clusterId = EncodingUtils.readShortString(buffer); + } + } + catch (AMQFrameDecodingException e) + { + throw new RuntimeException("Error in content header data: " + e); + } + + final int endPos = buffer.position(); + buffer.position(pos); + final int len = endPos - pos; + _encodedForm = new byte[len]; + final int limit = buffer.limit(); + buffer.limit(endPos); + buffer.get(_encodedForm, 0, len); + buffer.limit(limit); + buffer.position(endPos); + _decoded = true; + } + + + private void decodeUpToHeaders() + { + ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); + try + { + if ((_propertyFlags & (1 << 15)) > 0) + { + byte length = buffer.get(); + buffer.skip(length); + } + if ((_propertyFlags & (1 << 14)) > 0) + { + byte length = buffer.get(); + buffer.skip(length); + } + if ((_propertyFlags & (1 << 13)) > 0) + { + _headers = EncodingUtils.readFieldTable(buffer); + } + _decodedHeaders = true; + } + catch (AMQFrameDecodingException e) + { + throw new RuntimeException("Error in content header data: " + e); + } + } + + private void decodeUpToContentType() + { + ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); + + if ((_propertyFlags & (1 << 15)) > 0) + { + _contentType = EncodingUtils.readShortString(buffer); + } + + _decodedContentType = true; + } + + private void decodeIfNecessary() + { + if (!_decoded) + { + //decode(); + } + } + + private void decodeHeadersIfNecessary() + { + if (!_decoded && !_decodedHeaders) + { + decodeUpToHeaders(); + } + } + + private void decodeContentTypeIfNecessary() + { + if (!_decoded && !_decodedContentType) + { + decodeUpToContentType(); + } + } + public String getContentType() + { + decodeContentTypeIfNecessary(); + return _contentType; + } + + public void setContentType(String contentType) + { + clearEncodedForm(); + _propertyFlags |= (1 << 15); + _contentType = contentType; + } + + public String getEncoding() + { + decodeIfNecessary(); + return _encoding; + } + + public void setEncoding(String encoding) + { + clearEncodedForm(); + _propertyFlags |= (1 << 14); + _encoding = encoding; + } + + public FieldTable getHeaders() + { + decodeHeadersIfNecessary(); + return _headers; + } + + public void setHeaders(FieldTable headers) + { + clearEncodedForm(); + _propertyFlags |= (1 << 13); + _headers = headers; + } + + public byte getDeliveryMode() + { + decodeIfNecessary(); + return _deliveryMode; + } + + public void setDeliveryMode(byte deliveryMode) + { + clearEncodedForm(); + _propertyFlags |= (1 << 12); + _deliveryMode = deliveryMode; + } + + public byte getPriority() + { + decodeIfNecessary(); + return _priority; + } + + public void setPriority(byte priority) + { + clearEncodedForm(); + _propertyFlags |= (1 << 11); + _priority = priority; + } + + public String getCorrelationId() + { + decodeIfNecessary(); + return _correlationId; + } + + public void setCorrelationId(String correlationId) + { + clearEncodedForm(); + _propertyFlags |= (1 << 10); + _correlationId = correlationId; + } + + public String getReplyTo() + { + decodeIfNecessary(); + return _replyTo; + } + + public void setReplyTo(String replyTo) + { + clearEncodedForm(); + _propertyFlags |= (1 << 9); + _replyTo = replyTo; + } + + public long getExpiration() + { + decodeIfNecessary(); + return _expiration; + } + + public void setExpiration(long expiration) + { + clearEncodedForm(); + _propertyFlags |= (1 << 8); + _expiration = expiration; + } + + + public String getMessageId() + { + decodeIfNecessary(); + return _messageId; + } + + public void setMessageId(String messageId) + { + clearEncodedForm(); + _propertyFlags |= (1 << 7); + _messageId = messageId; + } + + public long getTimestamp() + { + decodeIfNecessary(); + return _timestamp; + } + + public void setTimestamp(long timestamp) + { + clearEncodedForm(); + _propertyFlags |= (1 << 6); + _timestamp = timestamp; + } + + public String getType() + { + decodeIfNecessary(); + return _type; + } + + public void setType(String type) + { + clearEncodedForm(); + _propertyFlags |= (1 << 5); + _type = type; + } + + public String getUserId() + { + decodeIfNecessary(); + return _userId; + } + + public void setUserId(String userId) + { + clearEncodedForm(); + _propertyFlags |= (1 << 4); + _userId = userId; + } + + public String getAppId() + { + decodeIfNecessary(); + return _appId; + } + + public void setAppId(String appId) + { + clearEncodedForm(); + _propertyFlags |= (1 << 3); + _appId = appId; + } + + public String getClusterId() + { + decodeIfNecessary(); + return _clusterId; + } + + public void setClusterId(String clusterId) + { + clearEncodedForm(); + _propertyFlags |= (1 << 2); + _clusterId = clusterId; + } + + public String toString() + { + return "reply-to = " + _replyTo + " propertyFlags = " + _propertyFlags; + } +} diff --git a/java/common/src/org/apache/qpid/framing/BodyFactory.java b/java/common/src/org/apache/qpid/framing/BodyFactory.java new file mode 100644 index 0000000000..cd40558de8 --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/BodyFactory.java @@ -0,0 +1,28 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +/** + * Any class that is capable of turning a stream of bytes into an AMQ structure must implement this interface. + */ +public interface BodyFactory +{ + AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException; +} diff --git a/java/common/src/org/apache/qpid/framing/CompositeAMQDataBlock.java b/java/common/src/org/apache/qpid/framing/CompositeAMQDataBlock.java new file mode 100644 index 0000000000..35e29aa064 --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/CompositeAMQDataBlock.java @@ -0,0 +1,100 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock +{ + private ByteBuffer _encodedBlock; + + private AMQDataBlock[] _blocks; + + public CompositeAMQDataBlock(AMQDataBlock[] blocks) + { + _blocks = blocks; + } + + /** + * The encoded block will be logically first before the AMQDataBlocks which are encoded + * into the buffer afterwards. + * @param encodedBlock already-encoded data + * @param blocks some blocks to be encoded. + */ + public CompositeAMQDataBlock(ByteBuffer encodedBlock, AMQDataBlock[] blocks) + { + this(blocks); + _encodedBlock = encodedBlock; + } + + public AMQDataBlock[] getBlocks() + { + return _blocks; + } + + public ByteBuffer getEncodedBlock() + { + return _encodedBlock; + } + + public long getSize() + { + long frameSize = 0; + for (int i = 0; i < _blocks.length; i++) + { + frameSize += _blocks[i].getSize(); + } + if (_encodedBlock != null) + { + _encodedBlock.rewind(); + frameSize += _encodedBlock.remaining(); + } + return frameSize; + } + + public void writePayload(ByteBuffer buffer) + { + if (_encodedBlock != null) + { + buffer.put(_encodedBlock); + } + for (int i = 0; i < _blocks.length; i++) + { + _blocks[i].writePayload(buffer); + } + } + + public String toString() + { + if (_blocks == null) + { + return "No blocks contained in composite frame"; + } + else + { + StringBuilder buf = new StringBuilder(this.getClass().getName()); + buf.append("{encodedBlock=").append(_encodedBlock); + for (int i = 0 ; i < _blocks.length; i++) + { + buf.append(" ").append(i).append("=[").append(_blocks[i].toString()).append("]"); + } + buf.append("}"); + return buf.toString(); + } + } +} diff --git a/java/common/src/org/apache/qpid/framing/ContentBody.java b/java/common/src/org/apache/qpid/framing/ContentBody.java new file mode 100644 index 0000000000..a345d1d225 --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/ContentBody.java @@ -0,0 +1,64 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class ContentBody extends AMQBody +{ + public static final byte TYPE = 3; + + public ByteBuffer payload; + + protected byte getType() + { + return TYPE; + } + + public int getSize() + { + return (payload == null?0:payload.limit()); + } + + public void writePayload(ByteBuffer buffer) + { + if (payload != null) + { + ByteBuffer copy = payload.duplicate(); + buffer.put(copy.rewind()); + } + } + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + if (size > 0) + { + payload = buffer.slice(); + payload.limit((int)size); + buffer.skip((int)size); + } + } + + public static AMQFrame createAMQFrame(int channelId, ContentBody body) + { + final AMQFrame frame = new AMQFrame(); + frame.channel = channelId; + frame.bodyFrame = body; + return frame; + } +} diff --git a/java/common/src/org/apache/qpid/framing/ContentBodyFactory.java b/java/common/src/org/apache/qpid/framing/ContentBodyFactory.java new file mode 100644 index 0000000000..1d6b72ce76 --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/ContentBodyFactory.java @@ -0,0 +1,44 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +public class ContentBodyFactory implements BodyFactory +{ + private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class); + + private static final ContentBodyFactory _instance = new ContentBodyFactory(); + + public static ContentBodyFactory getInstance() + { + return _instance; + } + + private ContentBodyFactory() + { + _log.debug("Creating content body factory"); + } + + public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + { + return new ContentBody(); + } +} + diff --git a/java/common/src/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/org/apache/qpid/framing/ContentHeaderBody.java new file mode 100644 index 0000000000..35ce107831 --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/ContentHeaderBody.java @@ -0,0 +1,112 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class ContentHeaderBody extends AMQBody +{ + public static final byte TYPE = 2; + + public int classId; + + public int weight; + + /** unsigned long but java can't handle that anyway when allocating byte array */ + public long bodySize; + + /** must never be null */ + public ContentHeaderProperties properties; + + public ContentHeaderBody() + { + } + + public ContentHeaderBody(ContentHeaderProperties props, int classId) + { + properties = props; + this.classId = classId; + } + + public ContentHeaderBody(int classId, int weight, ContentHeaderProperties props, long bodySize) + { + this(props, classId); + this.weight = weight; + this.bodySize = bodySize; + } + + protected byte getType() + { + return TYPE; + } + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + classId = buffer.getUnsignedShort(); + weight = buffer.getUnsignedShort(); + bodySize = buffer.getLong(); + int propertyFlags = buffer.getUnsignedShort(); + ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance(); + properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14); + } + + /** + * Helper method that is used currently by the persistence layer (by BDB at the moment). + * @param buffer + * @param size + * @return + * @throws AMQFrameDecodingException + */ + public static ContentHeaderBody createFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + ContentHeaderBody body = new ContentHeaderBody(); + body.populateFromBuffer(buffer, size); + return body; + } + + public int getSize() + { + return 2 + 2 + 8 + 2 + properties.getPropertyListSize(); + } + + public void writePayload(ByteBuffer buffer) + { + EncodingUtils.writeUnsignedShort(buffer, classId); + EncodingUtils.writeUnsignedShort(buffer, weight); + buffer.putLong(bodySize); + EncodingUtils.writeUnsignedShort(buffer, properties.getPropertyFlags()); + properties.writePropertyListPayload(buffer); + } + + public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties, + long bodySize) + { + final AMQFrame frame = new AMQFrame(); + frame.channel = channelId; + frame.bodyFrame = new ContentHeaderBody(classId, weight, properties, bodySize); + return frame; + } + + public static AMQFrame createAMQFrame(int channelId, ContentHeaderBody body) + { + final AMQFrame frame = new AMQFrame(); + frame.channel = channelId; + frame.bodyFrame = body; + return frame; + } +} diff --git a/java/common/src/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/java/common/src/org/apache/qpid/framing/ContentHeaderBodyFactory.java new file mode 100644 index 0000000000..236c5094fc --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/ContentHeaderBodyFactory.java @@ -0,0 +1,47 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +public class ContentHeaderBodyFactory implements BodyFactory +{ + private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class); + + private static final ContentHeaderBodyFactory _instance = new ContentHeaderBodyFactory(); + + public static ContentHeaderBodyFactory getInstance() + { + return _instance; + } + + private ContentHeaderBodyFactory() + { + _log.debug("Creating content header body factory"); + } + + public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + { + // all content headers are the same - it is only the properties that differ. + // the content header body further delegates construction of properties + return new ContentHeaderBody(); + } + + +} diff --git a/java/common/src/org/apache/qpid/framing/ContentHeaderProperties.java b/java/common/src/org/apache/qpid/framing/ContentHeaderProperties.java new file mode 100644 index 0000000000..65b629bf17 --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/ContentHeaderProperties.java @@ -0,0 +1,55 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +/** + * There will be an implementation of this interface for each content type. All content types have associated + * header properties and this provides a way to encode and decode them. + */ +public interface ContentHeaderProperties +{ + /** + * Writes the property list to the buffer, in a suitably encoded form. + * @param buffer The buffer to write to + */ + void writePropertyListPayload(ByteBuffer buffer); + + /** + * Populates the properties from buffer. + * @param buffer The buffer to read from. + * @param propertyFlags he property flags. + * @throws AMQFrameDecodingException when the buffer does not contain valid data + */ + void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) + throws AMQFrameDecodingException; + + /** + * @return the size of the encoded property list in bytes. + */ + int getPropertyListSize(); + + /** + * Gets the property flags. Property flags indicate which properties are set in the list. The + * position and meaning of each flag is defined in the protocol specification for the particular + * content type with which these properties are associated. + * @return flags + */ + int getPropertyFlags(); +} diff --git a/java/common/src/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java new file mode 100644 index 0000000000..ea1b0d6ef5 --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -0,0 +1,51 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class ContentHeaderPropertiesFactory +{ + private static final ContentHeaderPropertiesFactory _instance = new ContentHeaderPropertiesFactory(); + + public static ContentHeaderPropertiesFactory getInstance() + { + return _instance; + } + + private ContentHeaderPropertiesFactory() + { + } + + public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags, + ByteBuffer buffer, int size) + throws AMQFrameDecodingException + { + ContentHeaderProperties properties; + switch (classId) + { + case BasicConsumeBody.CLASS_ID: + properties = new BasicContentHeaderProperties(); + break; + default: + throw new AMQFrameDecodingException("Unsupport content header class id: " + classId); + } + properties.populatePropertiesFromBuffer(buffer, propertyFlags, size); + return properties; + } +} diff --git a/java/common/src/org/apache/qpid/framing/EncodableAMQDataBlock.java b/java/common/src/org/apache/qpid/framing/EncodableAMQDataBlock.java new file mode 100644 index 0000000000..3d493979eb --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/EncodableAMQDataBlock.java @@ -0,0 +1,32 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +/** + * Marker interface to indicate to MINA that a data block should be encoded with the + * single encoder/decoder that we have defined. + * + * Note that due to a bug in MINA all classes must directly implement this interface, even if + * a superclass implements it. + * TODO: fix MINA so that this is not necessary + * + */ +public interface EncodableAMQDataBlock +{ + +} diff --git a/java/common/src/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/org/apache/qpid/framing/EncodingUtils.java new file mode 100644 index 0000000000..f2f7a3b7a2 --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/EncodingUtils.java @@ -0,0 +1,519 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +import java.nio.charset.Charset; + +public class EncodingUtils +{ + private static final Logger _logger = Logger.getLogger(EncodingUtils.class); + + private static final String STRING_ENCODING = "iso8859-15"; + + private static final Charset _charset = Charset.forName("iso8859-15"); + + public static final int SIZEOF_UNSIGNED_SHORT = 2; + public static final int SIZEOF_UNSIGNED_INT = 4; + + public static int encodedShortStringLength(String s) + { + if (s == null) + { + return 1; + } + else + { + return (short) (1 + s.length()); + } + } + + public static int encodedLongStringLength(String s) + { + if (s == null) + { + return 4; + } + else + { + return 4 + s.length(); + } + } + + public static int encodedLongStringLength(char[] s) + { + if (s == null) + { + return 4; + } + else + { + return 4 + s.length; + } + } + + public static int encodedLongstrLength(byte[] bytes) + { + if (bytes == null) + { + return 4; + } + else + { + return 4 + bytes.length; + } + } + + public static int encodedFieldTableLength(FieldTable table) + { + if (table == null) + { + // size is encoded as 4 octets + return 4; + } + else + { + // size of the table plus 4 octets for the size + return (int)table.getEncodedSize() + 4; + } + } + + public static void writeShortStringBytes(ByteBuffer buffer, String s) + { + if (s != null) + { + byte[] encodedString = new byte[s.length()]; + char[] cha = s.toCharArray(); + for (int i = 0; i < cha.length; i++) + { + encodedString[i] = (byte) cha[i]; + } + // TODO: check length fits in an unsigned byte + writeUnsignedByte(buffer, (short)encodedString.length); + buffer.put(encodedString); + } + else + { + // really writing out unsigned byte + buffer.put((byte) 0); + } + } + + public static void writeLongStringBytes(ByteBuffer buffer, String s) + { + assert s == null || s.length() <= 0xFFFE; + if (s != null) + { + int len = s.length(); + writeUnsignedInteger(buffer, s.length()); + byte[] encodedString = new byte[len]; + char[] cha = s.toCharArray(); + for (int i = 0; i < cha.length; i++) + { + encodedString[i] = (byte) cha[i]; + } + buffer.put(encodedString); + } + else + { + writeUnsignedInteger(buffer, 0); + } + } + + public static void writeLongStringBytes(ByteBuffer buffer, char[] s) + { + assert s == null || s.length <= 0xFFFE; + if (s != null) + { + int len = s.length; + writeUnsignedInteger(buffer, s.length); + byte[] encodedString = new byte[len]; + for (int i = 0; i < s.length; i++) + { + encodedString[i] = (byte) s[i]; + } + buffer.put(encodedString); + } + else + { + writeUnsignedInteger(buffer, 0); + } + } + + public static void writeLongStringBytes(ByteBuffer buffer, byte[] bytes) + { + assert bytes == null || bytes.length <= 0xFFFE; + if (bytes != null) + { + writeUnsignedInteger(buffer, bytes.length); + buffer.put(bytes); + } + else + { + writeUnsignedInteger(buffer, 0); + } + } + + public static void writeUnsignedByte(ByteBuffer buffer, short b) + { + byte bv = (byte) b; + buffer.put(bv); + } + + public static void writeUnsignedShort(ByteBuffer buffer, int s) + { + // TODO: Is this comparison safe? Do I need to cast RHS to long? + if (s < Short.MAX_VALUE) + { + buffer.putShort((short)s); + } + else + { + short sv = (short) s; + buffer.put((byte) (0xFF & (sv >> 8))); + buffer.put((byte)(0xFF & sv)); + } + } + + public static void writeUnsignedInteger(ByteBuffer buffer, long l) + { + // TODO: Is this comparison safe? Do I need to cast RHS to long? + if (l < Integer.MAX_VALUE) + { + buffer.putInt((int)l); + } + else + { + int iv = (int) l; + + // FIXME: This *may* go faster if we build this into a local 4-byte array and then + // put the array in a single call. + buffer.put((byte) (0xFF & (iv >> 24))); + buffer.put((byte) (0xFF & (iv >> 16))); + buffer.put((byte) (0xFF & (iv >> 8 ))); + buffer.put((byte) (0xFF & iv)); + } + } + + public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table) + { + if (table != null) + { + table.writeToBuffer(buffer); + } + else + { + EncodingUtils.writeUnsignedInteger(buffer, 0); + } + } + + public static void writeBooleans(ByteBuffer buffer, boolean[] values) + { + byte packedValue = 0; + for (int i = 0; i < values.length; i++) + { + if (values[i]) + { + packedValue = (byte)(packedValue | (1 << i)); + } + } + + buffer.put(packedValue); + } + + /** + * This is used for writing longstrs. + * @param buffer + * @param data + */ + public static void writeLongstr(ByteBuffer buffer, byte[] data) + { + if (data != null) + { + writeUnsignedInteger(buffer, data.length); + buffer.put(data); + } + else + { + writeUnsignedInteger(buffer, 0); + } + } + + public static boolean[] readBooleans(ByteBuffer buffer) + { + byte packedValue = buffer.get(); + boolean[] result = new boolean[8]; + + for (int i = 0; i < 8; i++) + { + result[i] = ((packedValue & (1 << i)) != 0); + } + return result; + } + + public static FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException + { + long length = buffer.getUnsignedInt(); + if (length == 0) + { + return null; + } + else + { + return new FieldTable(buffer, length); + } + } + + public static String readShortString(ByteBuffer buffer) + { + short length = buffer.getUnsigned(); + if (length == 0) + { + return null; + } + else + { + // this may seem rather odd to declare two array but testing has shown + // that constructing a string from a byte array is 5 (five) times slower + // than constructing one from a char array. + // this approach here is valid since we know that all the chars are + // ASCII (0-127) + byte[] stringBytes = new byte[length]; + buffer.get(stringBytes, 0, length); + char[] stringChars = new char[length]; + for (int i = 0; i < stringChars.length; i++) + { + stringChars[i] = (char) stringBytes[i]; + } + + return new String(stringChars); + } + } + + public static String readLongString(ByteBuffer buffer) + { + long length = buffer.getUnsignedInt(); + if (length == 0) + { + return null; + } + else + { + // this may seem rather odd to declare two array but testing has shown + // that constructing a string from a byte array is 5 (five) times slower + // than constructing one from a char array. + // this approach here is valid since we know that all the chars are + // ASCII (0-127) + byte[] stringBytes = new byte[(int)length]; + buffer.get(stringBytes, 0, (int)length); + char[] stringChars = new char[(int)length]; + for (int i = 0; i < stringChars.length; i++) + { + stringChars[i] = (char) stringBytes[i]; + } + return new String(stringChars); + } + } + + public static byte[] readLongstr(ByteBuffer buffer) throws AMQFrameDecodingException + { + long length = buffer.getUnsignedInt(); + if (length == 0) + { + return null; + } + else + { + byte[] result = new byte[(int)length]; + buffer.get(result); + return result; + } + } + + // Will barf with a NPE on a null input. Not sure whether it should return null or + // an empty field-table (which would be slower - perhaps unnecessarily). + // + // Some sample input and the result output: + // + // Input: "a=1" "a=2 c=3" "a=3 c=4 d" "a='four' b='five'" "a=bad" + // + // Parsing <a=1>... + // {a=1} + // Parsing <a=2 c=3>... + // {a=2, c=3} + // Parsing <a=3 c=4 d>... + // {a=3, c=4, d=null} + // Parsing <a='four' b='five'>... + // {a=four, b=five} + // Parsing <a=bad>... + // java.lang.IllegalArgumentException: a: Invalid integer in <bad> from <a=bad>. + // + public static FieldTable createFieldTableFromMessageSelector(String selector) + { + boolean debug = _logger.isDebugEnabled(); + + // TODO: Doesn't support embedded quotes properly. + String[] expressions = selector.split(" +"); + + FieldTable result = new FieldTable(); + + for(int i = 0; i < expressions.length; i++) + { + String expr = expressions[i]; + + if (debug) _logger.debug("Expression = <" + expr + ">"); + + int equals = expr.indexOf('='); + + if (equals < 0) + { + // Existence check + result.put("S" + expr.trim(),null); + } + else + { + String key = expr.substring(0,equals).trim(); + String value = expr.substring(equals + 1).trim(); + + if (debug) _logger.debug("Key = <" + key + ">, Value = <" + value + ">"); + + if (value.charAt(0) == '\'') + { + if (value.charAt(value.length()- 1) != '\'') + { + throw new IllegalArgumentException(key + ": Missing quote in <" + value + "> from <" + selector + ">."); + } + else + { + value = value.substring(1,value.length() - 1); + + result.put("S" + key,value); + } + } + else + { + try + { + int intValue = Integer.parseInt(value); + + result.put("i" + key,value); + } + catch(NumberFormatException e) + { + throw new IllegalArgumentException(key + ": Invalid integer in <" + value + "> from <" + selector + ">."); + + } + } + } + } + + if (debug) _logger.debug("Field-table created from <" + selector + "> is <" + result + ">"); + + return(result); + + } + + static byte[] hexToByteArray(String id) + { + // Should check param for null, long enough for this check, upper-case and trailing char + String s = (id.charAt(1) == 'x') ? id.substring(2) : id; // strip 0x + + int len = s.length(); + int byte_len = len / 2; + byte[] b = new byte[byte_len]; + + for(int i = 0; i < byte_len; i++) + { + // fixme: refine these repetitive subscript calcs. + int ch = i * 2; + + byte b1 = Byte.parseByte(s.substring(ch,ch + 1),16); + byte b2 = Byte.parseByte(s.substring(ch + 1,ch + 2),16); + + b[i] = (byte)(b1 * 16 + b2); + } + + return(b); + } + + public static char[] convertToHexCharArray(byte[] from) + { + int length = from.length; + char[] result_buff = new char[length * 2 + 2]; + + result_buff[0] = '0'; + result_buff[1] = 'x'; + + int bite; + int dest = 2; + + for(int i = 0; i < length; i++) + { + bite = from[i]; + + if (bite < 0) bite += 256; + + result_buff[dest++] = hex_chars[bite >> 4]; + result_buff[dest++] = hex_chars[bite & 0x0f]; + } + + return(result_buff); + } + + public static String convertToHexString(byte[] from) + { + return(new String(convertToHexCharArray(from))); + } + + public static String convertToHexString(ByteBuffer bb) + { + int size = bb.limit(); + + byte[] from = new byte[size]; + + for(int i = 0; i < size; i++) + { + from[i] = bb.get(i); + } + + return(new String(convertToHexCharArray(from))); + } + + public static void main(String[] args) + { + for(int i = 0; i < args.length; i++) + { + String selector = args[i]; + + System.err.println("Parsing <" + selector + ">..."); + + try + { + System.err.println(createFieldTableFromMessageSelector(selector)); + } + catch(IllegalArgumentException e) + { + System.err.println(e); + } + } + } + + private static char hex_chars[] = {'0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'}; +} diff --git a/java/common/src/org/apache/qpid/framing/FieldTable.java b/java/common/src/org/apache/qpid/framing/FieldTable.java new file mode 100644 index 0000000000..30f41205dd --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/FieldTable.java @@ -0,0 +1,319 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +import java.util.*; + +/** + * From the protocol document: + * field-table = short-integer *field-value-pair + * field-value-pair = field-name field-value + * field-name = short-string + * field-value = 'S' long-string + * / 'I' long-integer + * / 'D' decimal-value + * / 'T' long-integer + * decimal-value = decimals long-integer + * decimals = OCTET + */ +public class FieldTable extends LinkedHashMap +{ + private static final Logger _logger = Logger.getLogger(FieldTable.class); + private long _encodedSize = 0; + + public FieldTable() + { + super(); + } + + /** + * Construct a new field table. + * + * @param buffer the buffer from which to read data. The length byte must be read already + * @param length the length of the field table. Must be > 0. + * @throws AMQFrameDecodingException if there is an error decoding the table + */ + public FieldTable(ByteBuffer buffer, long length) throws AMQFrameDecodingException + { + super(); + final boolean debug = _logger.isDebugEnabled(); + assert length > 0; + _encodedSize = length; + int sizeRead = 0; + while (sizeRead < _encodedSize) + { + int sizeRemaining = buffer.remaining(); + final String key = EncodingUtils.readShortString(buffer); + // TODO: use proper charset decoder + byte iType = buffer.get(); + final char type = (char) iType; + Object value; + switch (type) + { + case 'S': + value = EncodingUtils.readLongString(buffer); + break; + case 'I': + value = new Long(buffer.getUnsignedInt()); + break; + default: + String msg = "Field '" + key + "' - unsupported field table type: " + type; + //some extra debug information... + msg += " (" + iType + "), length=" + length + ", sizeRead=" + sizeRead + ", sizeRemaining=" + sizeRemaining; + throw new AMQFrameDecodingException(msg); + } + sizeRead += (sizeRemaining - buffer.remaining()); + + if (debug) + { + _logger.debug("FieldTable::FieldTable(buffer," + length + "): Read type '" + type + "', key '" + key + "', value '" + value + "' (now read " + sizeRead + " of " + length + " encoded bytes)..."); + } + + // we deliberately want to call put in the parent class since we do + // not need to do the size calculations + super.put(key, value); + } + + if (debug) + { + _logger.debug("FieldTable::FieldTable(buffer," + length + "): Done."); + } + } + + public void writeToBuffer(ByteBuffer buffer) + { + final boolean debug = _logger.isDebugEnabled(); + + if (debug) + { + _logger.debug("FieldTable::writeToBuffer: Writing encoded size of " + _encodedSize + "..."); + } + + // write out the total length, which we have kept up to date as data is added + EncodingUtils.writeUnsignedInteger(buffer, _encodedSize); + final Iterator it = this.entrySet().iterator(); + while (it.hasNext()) + { + Map.Entry me = (Map.Entry) it.next(); + String key = (String) me.getKey(); + + EncodingUtils.writeShortStringBytes(buffer, key); + Object value = me.getValue(); + + if (debug) + { + _logger.debug("FieldTable::writeToBuffer: Writing key '" + key + "' of type " + value.getClass() + ", value '" + value + "'..."); + } + + if (value instanceof byte[]) + { + buffer.put((byte) 'S'); + EncodingUtils.writeLongstr(buffer, (byte[]) value); + } + else if (value instanceof String) + { + // TODO: look at using proper charset encoder + buffer.put((byte) 'S'); + EncodingUtils.writeLongStringBytes(buffer, (String) value); + } + else if (value instanceof Long) + { + // TODO: look at using proper charset encoder + buffer.put((byte) 'I'); + EncodingUtils.writeUnsignedInteger(buffer, ((Long) value).longValue()); + } + else + { + // Should never get here + throw new IllegalArgumentException("Key '" + key + "': Unsupported type in field table, type: " + ((value == null) ? "null-object" : value.getClass())); + } + } + + if (debug) + { + _logger.debug("FieldTable::writeToBuffer: Done."); + } + } + + public byte[] getDataAsBytes() + { + final ByteBuffer buffer = ByteBuffer.allocate((int) _encodedSize); // XXX: Is cast a problem? + final Iterator it = this.entrySet().iterator(); + while (it.hasNext()) + { + Map.Entry me = (Map.Entry) it.next(); + String key = (String) me.getKey(); + EncodingUtils.writeShortStringBytes(buffer, key); + Object value = me.getValue(); + if (value instanceof byte[]) + { + buffer.put((byte) 'S'); + EncodingUtils.writeLongstr(buffer, (byte[]) value); + } + else if (value instanceof String) + { + // TODO: look at using proper charset encoder + buffer.put((byte) 'S'); + EncodingUtils.writeLongStringBytes(buffer, (String) value); + } + else if (value instanceof char[]) + { + // TODO: look at using proper charset encoder + buffer.put((byte) 'S'); + EncodingUtils.writeLongStringBytes(buffer, (char[]) value); + } + else if (value instanceof Long || value instanceof Integer) + { + // TODO: look at using proper charset encoder + buffer.put((byte) 'I'); + EncodingUtils.writeUnsignedInteger(buffer, ((Long) value).longValue()); + } + else + { + // Should never get here + assert false; + } + } + final byte[] result = new byte[(int) _encodedSize]; + buffer.flip(); + buffer.get(result); + buffer.release(); + return result; + } + + public Object put(Object key, Object value) + { + final boolean debug = _logger.isDebugEnabled(); + + if (key == null) + { + throw new IllegalArgumentException("All keys must be Strings - was passed: null"); + } + else if (!(key instanceof String)) + { + throw new IllegalArgumentException("All keys must be Strings - was passed: " + key.getClass()); + } + + Object existing; + + if ((existing = super.remove(key)) != null) + { + if (debug) + { + _logger.debug("Found duplicate of key '" + key + "', previous value '" + existing + "' (" + existing.getClass() + "), to be replaced by '" + value + "', (" + value.getClass() + ") - stack trace of source of duplicate follows...", new Throwable().fillInStackTrace()); + } + + // If we are in effect deleting the value (see comment on null values being deleted + // below) then we also need to remove the name from the encoding length. + if (value == null) + { + _encodedSize -= EncodingUtils.encodedShortStringLength((String) key); + } + + // FIXME: Should be able to short-cut this process if the old and new values are + // the same object and/or type and size... + _encodedSize -= getEncodingSize(existing); + } + else + { + if (value != null) + { + _encodedSize += EncodingUtils.encodedShortStringLength((String) key); + } + } + + // For now: Setting a null value is the equivalent of deleting it. + // This is ambiguous in the JMS spec and needs thrashing out and potentially + // testing against other implementations. + if (value != null) + { + _encodedSize += getEncodingSize(value); + } + + return super.put(key, value); + } + + public Object remove(Object key) + { + if (super.containsKey(key)) + { + final Object value = super.remove(key); + _encodedSize -= EncodingUtils.encodedShortStringLength((String) key); + + // This check is, for now, unnecessary (we don't store null values). + if (value != null) + { + _encodedSize -= getEncodingSize(value); + } + + return value; + } + else + { + return null; + } + } + + /** + * @return unsigned integer + */ + public long getEncodedSize() + { + return _encodedSize; + } + + /** + * @return integer + */ + private static int getEncodingSize(Object value) + { + int encodingSize; + + // the extra byte if for the type indicator that is written out + if (value instanceof String) + { + encodingSize = 1 + EncodingUtils.encodedLongStringLength((String) value); + } + else if (value instanceof char[]) + { + encodingSize = 1 + EncodingUtils.encodedLongStringLength((char[]) value); + } + else if (value instanceof Integer) + { + encodingSize = 1 + 4; + } + else if (value instanceof Long) + { + encodingSize = 1 + 4; + } + else + { + throw new IllegalArgumentException("Unsupported type in field table: " + value.getClass()); + } + + return encodingSize; + } + + public Enumeration keys() + { + return new FieldTableKeyEnumeration(this); + } +} diff --git a/java/common/src/org/apache/qpid/framing/FieldTableKeyEnumeration.java b/java/common/src/org/apache/qpid/framing/FieldTableKeyEnumeration.java new file mode 100644 index 0000000000..2bc890ebbc --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/FieldTableKeyEnumeration.java @@ -0,0 +1,44 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import java.util.Enumeration; +import java.util.Iterator; + + +public class FieldTableKeyEnumeration implements Enumeration +{ + protected FieldTable _table; + protected Iterator _iterator; + + public FieldTableKeyEnumeration(FieldTable ft) + { + _table = ft; + _iterator = ft.keySet().iterator(); + } + + public boolean hasMoreElements() + { + return _iterator.hasNext(); + } + + public Object nextElement() + { + return _iterator.next(); + } +} diff --git a/java/common/src/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/org/apache/qpid/framing/HeartbeatBody.java new file mode 100644 index 0000000000..4dda794427 --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/HeartbeatBody.java @@ -0,0 +1,54 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class HeartbeatBody extends AMQBody +{ + public static final byte TYPE = 8; + public static AMQFrame FRAME = new HeartbeatBody().toFrame(); + + protected byte getType() + { + return TYPE; + } + + protected int getSize() + { + return 0;//heartbeats we generate have no payload + } + + protected void writePayload(ByteBuffer buffer) + { + } + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + if(size > 0) + { + //allow other implementations to have a payload, but ignore it: + buffer.skip((int) size); + } + } + + public AMQFrame toFrame() + { + return new AMQFrame(0, this); + } +} diff --git a/java/common/src/org/apache/qpid/framing/HeartbeatBodyFactory.java b/java/common/src/org/apache/qpid/framing/HeartbeatBodyFactory.java new file mode 100644 index 0000000000..1d63f3827b --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/HeartbeatBodyFactory.java @@ -0,0 +1,28 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class HeartbeatBodyFactory implements BodyFactory +{ + public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + { + return new HeartbeatBody(); + } +} diff --git a/java/common/src/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/org/apache/qpid/framing/ProtocolInitiation.java new file mode 100644 index 0000000000..e500a683dc --- /dev/null +++ b/java/common/src/org/apache/qpid/framing/ProtocolInitiation.java @@ -0,0 +1,176 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.codec.ProtocolDecoderOutput; +import org.apache.qpid.AMQException; + +public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock +{ + public char[] header = new char[]{'A','M','Q','P'}; + // TODO: generate these constants automatically from the xml protocol spec file + + private static byte CURRENT_PROTOCOL_CLASS = 1; + private static final int CURRENT_PROTOCOL_INSTANCE = 1; + + public byte protocolClass = CURRENT_PROTOCOL_CLASS; + public byte protocolInstance = CURRENT_PROTOCOL_INSTANCE; + public byte protocolMajor; + public byte protocolMinor; + +// public ProtocolInitiation() {} + + public ProtocolInitiation(byte major, byte minor) + { + protocolMajor = major; + protocolMinor = minor; + } + + public long getSize() + { + return 4 + 1 + 1 + 1 + 1; + } + + public void writePayload(ByteBuffer buffer) + { + for (int i = 0; i < header.length; i++) + { + buffer.put((byte) header[i]); + } + buffer.put(protocolClass); + buffer.put(protocolInstance); + buffer.put(protocolMajor); + buffer.put(protocolMinor); + } + + public void populateFromBuffer(ByteBuffer buffer) throws AMQException + { + throw new AMQException("Method not implemented"); + } + + public boolean equals(Object o) + { + if (!(o instanceof ProtocolInitiation)) + { + return false; + } + + ProtocolInitiation pi = (ProtocolInitiation) o; + if (pi.header == null) + { + return false; + } + + if (header.length != pi.header.length) + { + return false; + } + + for (int i = 0; i < header.length; i++) + { + if (header[i] != pi.header[i]) + { + return false; + } + } + + return (protocolClass == pi.protocolClass && + protocolInstance == pi.protocolInstance && + protocolMajor == pi.protocolMajor && + protocolMinor == pi.protocolMinor); + } + + public static class Decoder //implements MessageDecoder + { + /** + * + * @param session + * @param in + * @return true if we have enough data to decode the PI frame fully, false if more + * data is required + */ + public boolean decodable(IoSession session, ByteBuffer in) + { + return (in.remaining() >= 8); + } + + public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) + throws Exception + { + byte[] theHeader = new byte[4]; + in.get(theHeader); + ProtocolInitiation pi = new ProtocolInitiation((byte)0, (byte)0); + pi.header = new char[]{(char) theHeader[0],(char) theHeader[CURRENT_PROTOCOL_INSTANCE],(char) theHeader[2], (char) theHeader[3]}; + String stringHeader = new String(pi.header); + if (!"AMQP".equals(stringHeader)) + { + throw new AMQProtocolHeaderException("Invalid protocol header - read " + stringHeader); + } + pi.protocolClass = in.get(); + pi.protocolInstance = in.get(); + pi.protocolMajor = in.get(); + pi.protocolMinor = in.get(); + out.write(pi); + } + } + + public void checkVersion(ProtocolVersionList pvl) throws AMQException + { + if (protocolClass != CURRENT_PROTOCOL_CLASS) + { + throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " + + protocolClass); + } + if (protocolInstance != CURRENT_PROTOCOL_INSTANCE) + { + throw new AMQProtocolInstanceException("Protocol instance " + CURRENT_PROTOCOL_INSTANCE + " was expected; received " + + protocolInstance); + } + /* + if (protocolMajor != CURRENT_PROTOCOL_VERSION_MAJOR) + { + throw new AMQProtocolVersionException("Protocol major version " + CURRENT_PROTOCOL_VERSION_MAJOR + + " was expected; received " + protocolMajor); + } + if (protocolMinor != CURRENT_PROTOCOL_VERSION_MINOR) + { + throw new AMQProtocolVersionException("Protocol minor version " + CURRENT_PROTOCOL_VERSION_MINOR + + " was expected; received " + protocolMinor); + } + */ + + /* Look through list of available protocol versions */ + boolean found = false; + for (int i=0; i<pvl.pv.length; i++) + { + if (pvl.pv[i][pvl.PROTOCOL_MAJOR] == protocolMajor && + pvl.pv[i][pvl.PROTOCOL_MINOR] == protocolMinor) + { + found = true; + } + } + if (!found) + { + // TODO: add list of available versions in list to msg... + throw new AMQProtocolVersionException("Protocol version " + + protocolMajor + "." + protocolMinor + " not found in protocol version list."); + } + } +} diff --git a/java/common/src/org/apache/qpid/nio/SocketAcceptor.java b/java/common/src/org/apache/qpid/nio/SocketAcceptor.java new file mode 100644 index 0000000000..1033d16191 --- /dev/null +++ b/java/common/src/org/apache/qpid/nio/SocketAcceptor.java @@ -0,0 +1,31 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.nio; + +import org.apache.mina.common.support.DelegatedIoAcceptor; + +public class SocketAcceptor extends DelegatedIoAcceptor +{ + /** + * Creates a new instance. + */ + public SocketAcceptor() + { + init(new SocketAcceptorDelegate(this)); + } +} diff --git a/java/common/src/org/apache/qpid/nio/SocketAcceptorDelegate.java b/java/common/src/org/apache/qpid/nio/SocketAcceptorDelegate.java new file mode 100644 index 0000000000..7339482ac0 --- /dev/null +++ b/java/common/src/org/apache/qpid/nio/SocketAcceptorDelegate.java @@ -0,0 +1,602 @@ +/* + * @(#) $Id: SocketAcceptorDelegate.java 379346 2006-02-21 05:10:30Z trustin $ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.nio; + +import org.apache.mina.common.*; +import org.apache.mina.common.support.BaseIoAcceptor; +import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.mina.util.IdentityHashSet; +import org.apache.mina.util.Queue; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.*; + +/** + * {@link IoAcceptor} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (dev@directory.apache.org) + * @version $Rev: 379346 $, $Date: 2006-02-21 05:10:30 +0000 (Tue, 21 Feb 2006) $ + */ +public class SocketAcceptorDelegate extends BaseIoAcceptor +{ + private static volatile int nextId = 0; + + private final IoAcceptor wrapper; + private final int id = nextId ++; + private final String threadName = "SocketAcceptor-" + id; + private final IoServiceConfig defaultConfig = new SocketAcceptorConfig(); + private Selector selector; + private final Map channels = new HashMap(); + private final Hashtable sessions = new Hashtable(); + + private final Queue registerQueue = new Queue(); + private final Queue cancelQueue = new Queue(); + + private Worker worker; + + /** + * Creates a new instance. + */ + public SocketAcceptorDelegate(IoAcceptor wrapper) + { + this.wrapper = wrapper; + } + + /** + * Binds to the specified <code>address</code> and handles incoming + * connections with the specified <code>handler</code>. Backlog value + * is configured to the value of <code>backlog</code> property. + * + * @throws IOException if failed to bind + */ + public void bind(SocketAddress address, IoHandler handler, IoServiceConfig config) throws IOException + { + if (address == null) + { + throw new NullPointerException("address"); + } + + if (handler == null) + { + throw new NullPointerException("handler"); + } + + if (!(address instanceof InetSocketAddress)) + { + throw new IllegalArgumentException("Unexpected address type: " + address.getClass()); + } + + if (((InetSocketAddress) address).getPort() == 0) + { + throw new IllegalArgumentException("Unsupported port number: 0"); + } + + if (config == null) + { + config = getDefaultConfig(); + } + + RegistrationRequest request = new RegistrationRequest(address, handler, config); + + synchronized (this) + { + synchronized (registerQueue) + { + registerQueue.push(request); + } + startupWorker(); + } + + selector.wakeup(); + + synchronized (request) + { + while (!request.done) + { + try + { + request.wait(); + } + catch (InterruptedException e) + { + } + } + } + + if (request.exception != null) + { + throw request.exception; + } + } + + + private synchronized void startupWorker() throws IOException + { + if (worker == null) + { + selector = Selector.open(); + worker = new Worker(); + + worker.start(); + } + } + + public Set getManagedSessions(SocketAddress address) + { + if (address == null) + { + throw new NullPointerException("address"); + } + + Set managedSessions = (Set) sessions.get(address); + + if (managedSessions == null) + { + throw new IllegalArgumentException("Address not bound: " + address); + } + + return Collections.unmodifiableSet( + new IdentityHashSet(Arrays.asList(managedSessions.toArray()))); + } + + public void unbind(SocketAddress address) + { + if (address == null) + { + throw new NullPointerException("address"); + } + + final Set managedSessions = (Set) sessions.get(address); + CancellationRequest request = new CancellationRequest(address); + synchronized (this) + { + try + { + startupWorker(); + } + catch (IOException e) + { + // IOException is thrown only when Worker thread is not + // running and failed to open a selector. We simply throw + // IllegalArgumentException here because we can simply + // conclude that nothing is bound to the selector. + throw new IllegalArgumentException("Address not bound: " + address); + } + + synchronized (cancelQueue) + { + cancelQueue.push(request); + } + } + + selector.wakeup(); + + synchronized (request) + { + while (!request.done) + { + try + { + request.wait(); + } + catch (InterruptedException e) + { + } + } + } + + if (request.exception != null) + { + request.exception.fillInStackTrace(); + + throw request.exception; + } + + // Disconnect all clients + IoServiceConfig cfg = request.registrationRequest.config; + boolean disconnectOnUnbind; + if (cfg instanceof IoAcceptorConfig) + { + disconnectOnUnbind = ((IoAcceptorConfig) cfg).isDisconnectOnUnbind(); + } + else + { + disconnectOnUnbind = ((IoAcceptorConfig) getDefaultConfig()).isDisconnectOnUnbind(); + } + + if (disconnectOnUnbind && managedSessions != null) + { + IoSession[] tempSessions = (IoSession[]) + managedSessions.toArray(new IoSession[ 0 ]); + + final Object lock = new Object(); + + for (int i = 0; i < tempSessions.length; i++) + { + if (!managedSessions.contains(tempSessions[i])) + { + // The session has already been closed and have been + // removed from managedSessions by the SocketIoProcessor. + continue; + } + tempSessions[i].close().setCallback(new IoFuture.Callback() + { + public void operationComplete(IoFuture future) + { + synchronized (lock) + { + lock.notify(); + } + } + }); + } + + try + { + synchronized (lock) + { + while (!managedSessions.isEmpty()) + { + lock.wait(1000); + } + } + } + catch (InterruptedException ie) + { + // Ignored + } + } + } + + public void unbindAll() + { + List addresses; + synchronized (channels) + { + addresses = new ArrayList(channels.keySet()); + } + + for (Iterator i = addresses.iterator(); i.hasNext();) + { + unbind((SocketAddress) i.next()); + } + } + + public boolean isBound(SocketAddress address) + { + synchronized (channels) + { + return channels.containsKey(address); + } + } + + public Set getBoundAddresses() + { + return wrapper.getBoundAddresses(); + } + + private class Worker extends Thread + { + public Worker() + { + super(SocketAcceptorDelegate.this.threadName); + } + + public void run() + { + for (; ;) + { + try + { + int nKeys = selector.select(); + + registerNew(); + cancelKeys(); + + if (nKeys > 0) + { + processSessions(selector.selectedKeys()); + } + + if (selector.keys().isEmpty()) + { + synchronized (SocketAcceptorDelegate.this) + { + if (selector.keys().isEmpty() && + registerQueue.isEmpty() && + cancelQueue.isEmpty()) + { + worker = null; + try + { + selector.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + selector = null; + } + break; + } + } + } + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + + try + { + Thread.sleep(1000); + } + catch (InterruptedException e1) + { + } + } + } + } + + private void processSessions(Set keys) throws IOException + { + Iterator it = keys.iterator(); + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + + it.remove(); + + if (!key.isAcceptable()) + { + continue; + } + + ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); + + SocketChannel ch = ssc.accept(); + + if (ch == null) + { + continue; + } + + boolean success = false; + SocketSessionImpl session = null; + try + { + RegistrationRequest req = (RegistrationRequest) key.attachment(); + session = new SocketSessionImpl( + SocketAcceptorDelegate.this.wrapper, + (Set) sessions.get(req.address), + (SocketSessionConfig) req.config.getSessionConfig(), + ch, req.handler, + req.address); + getFilterChainBuilder().buildFilterChain(session.getFilterChain()); + req.config.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); + ((SocketFilterChain) session.getFilterChain()).sessionCreated(session); + session.getManagedSessions().add(session); + session.getIoProcessor().addNew(session); + success = true; + } + catch (Throwable t) + { + ExceptionMonitor.getInstance().exceptionCaught(t); + } + finally + { + if (!success) + { + if (session != null) + { + session.getManagedSessions().remove(session); + } + ch.close(); + } + } + } + } + } + + public IoServiceConfig getDefaultConfig() + { + return defaultConfig; + } + + private void registerNew() + { + if (registerQueue.isEmpty()) + { + return; + } + + for (; ;) + { + RegistrationRequest req; + + synchronized (registerQueue) + { + req = (RegistrationRequest) registerQueue.pop(); + } + + if (req == null) + { + break; + } + + ServerSocketChannel ssc = null; + + try + { + ssc = ServerSocketChannel.open(); + ssc.configureBlocking(false); + + // Configure the server socket, + SocketAcceptorConfig cfg; + if (req.config instanceof SocketAcceptorConfig) + { + cfg = (SocketAcceptorConfig) req.config; + } + else + { + cfg = (SocketAcceptorConfig) getDefaultConfig(); + } + + ssc.socket().setReuseAddress(cfg.isReuseAddress()); + ssc.socket().setReceiveBufferSize( + ((SocketSessionConfig) cfg.getSessionConfig()).getReceiveBufferSize()); + + // and bind. + ssc.socket().bind(req.address, cfg.getBacklog()); + ssc.register(selector, SelectionKey.OP_ACCEPT, req); + + synchronized (channels) + { + channels.put(req.address, ssc); + } + sessions.put(req.address, Collections.synchronizedSet(new HashSet())); + } + catch (IOException e) + { + req.exception = e; + } + finally + { + synchronized (req) + { + req.done = true; + + req.notify(); + } + + if (ssc != null && req.exception != null) + { + try + { + ssc.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + } + } + } + } + + + private void cancelKeys() + { + if (cancelQueue.isEmpty()) + { + return; + } + + for (; ;) + { + CancellationRequest request; + + synchronized (cancelQueue) + { + request = (CancellationRequest) cancelQueue.pop(); + } + + if (request == null) + { + break; + } + + sessions.remove(request.address); + ServerSocketChannel ssc; + synchronized (channels) + { + ssc = (ServerSocketChannel) channels.remove(request.address); + } + + // close the channel + try + { + if (ssc == null) + { + request.exception = new IllegalArgumentException("Address not bound: " + request.address); + } + else + { + SelectionKey key = ssc.keyFor(selector); + request.registrationRequest = (RegistrationRequest) key.attachment(); + key.cancel(); + + selector.wakeup(); // wake up again to trigger thread death + + ssc.close(); + } + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + synchronized (request) + { + request.done = true; + request.notify(); + } + } + } + } + + private static class RegistrationRequest + { + private final SocketAddress address; + private final IoHandler handler; + private final IoServiceConfig config; + private IOException exception; + private boolean done; + + private RegistrationRequest(SocketAddress address, IoHandler handler, IoServiceConfig config) + { + this.address = address; + this.handler = handler; + this.config = config; + } + } + + + private static class CancellationRequest + { + private final SocketAddress address; + private boolean done; + private RegistrationRequest registrationRequest; + private RuntimeException exception; + + private CancellationRequest(SocketAddress address) + { + this.address = address; + } + } +} diff --git a/java/common/src/org/apache/qpid/nio/SocketConnector.java b/java/common/src/org/apache/qpid/nio/SocketConnector.java new file mode 100644 index 0000000000..ce74fd0c96 --- /dev/null +++ b/java/common/src/org/apache/qpid/nio/SocketConnector.java @@ -0,0 +1,32 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.nio; + +import org.apache.mina.common.support.DelegatedIoConnector; +import org.apache.mina.transport.socket.nio.support.*; + +public class SocketConnector extends DelegatedIoConnector +{ + /** + * Creates a new instance. + */ + public SocketConnector() + { + init(new SocketConnectorDelegate(this)); + } +}
\ No newline at end of file diff --git a/java/common/src/org/apache/qpid/nio/SocketConnectorDelegate.java b/java/common/src/org/apache/qpid/nio/SocketConnectorDelegate.java new file mode 100644 index 0000000000..50c122d1c8 --- /dev/null +++ b/java/common/src/org/apache/qpid/nio/SocketConnectorDelegate.java @@ -0,0 +1,402 @@ +/* + * @(#) $Id: SocketConnectorDelegate.java 379044 2006-02-20 07:40:37Z trustin $ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.nio; + +import org.apache.mina.common.*; +import org.apache.mina.common.support.BaseIoConnector; +import org.apache.mina.common.support.DefaultConnectFuture; +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; +import org.apache.mina.util.Queue; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +/** + * {@link IoConnector} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (dev@directory.apache.org) + * @version $Rev: 379044 $, $Date: 2006-02-20 07:40:37 +0000 (Mon, 20 Feb 2006) $ + */ +public class SocketConnectorDelegate extends BaseIoConnector +{ + private static volatile int nextId = 0; + + private final IoConnector wrapper; + private final int id = nextId++; + private final String threadName = "SocketConnector-" + id; + private final IoServiceConfig defaultConfig = new SocketConnectorConfig(); + private Selector selector; + private final Queue connectQueue = new Queue(); + private final Set managedSessions = Collections.synchronizedSet(new HashSet()); + private Worker worker; + + /** + * Creates a new instance. + */ + public SocketConnectorDelegate(IoConnector wrapper) + { + this.wrapper = wrapper; + } + + public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config) + { + return connect(address, null, handler, config); + } + + public ConnectFuture connect(SocketAddress address, SocketAddress localAddress, + IoHandler handler, IoServiceConfig config) + { + if (address == null) + { + throw new NullPointerException("address"); + } + if (handler == null) + { + throw new NullPointerException("handler"); + } + + if (! (address instanceof InetSocketAddress)) + { + throw new IllegalArgumentException("Unexpected address type: " + + address.getClass()); + } + + if (localAddress != null && !(localAddress instanceof InetSocketAddress)) + { + throw new IllegalArgumentException("Unexpected local address type: " + + localAddress.getClass()); + } + + if (config == null) + { + config = getDefaultConfig(); + } + + SocketChannel ch = null; + boolean success = false; + try + { + ch = SocketChannel.open(); + ch.socket().setReuseAddress(true); + if (localAddress != null) + { + ch.socket().bind(localAddress); + } + + ch.configureBlocking(false); + + if (ch.connect(address)) + { + SocketSessionImpl session = newSession(ch, handler, config); + success = true; + ConnectFuture future = new DefaultConnectFuture(); + future.setSession(session); + return future; + } + + success = true; + } + catch (IOException e) + { + return DefaultConnectFuture.newFailedFuture(e); + } + finally + { + if (!success && ch != null) + { + try + { + ch.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + } + } + + ConnectionRequest request = new ConnectionRequest(ch, handler, config); + synchronized (this) + { + try + { + startupWorker(); + } + catch (IOException e) + { + try + { + ch.close(); + } + catch (IOException e2) + { + ExceptionMonitor.getInstance().exceptionCaught(e2); + } + + return DefaultConnectFuture.newFailedFuture(e); + } + synchronized (connectQueue) + { + connectQueue.push(request); + } + selector.wakeup(); + } + + return request; + } + + public IoServiceConfig getDefaultConfig() + { + return defaultConfig; + } + + private synchronized void startupWorker() throws IOException + { + if (worker == null) + { + selector = Selector.open(); + worker = new Worker(); + worker.start(); + } + } + + private void registerNew() + { + if (connectQueue.isEmpty()) + { + return; + } + + for (; ;) + { + ConnectionRequest req; + synchronized (connectQueue) + { + req = (ConnectionRequest) connectQueue.pop(); + } + + if (req == null) + { + break; + } + + SocketChannel ch = req.channel; + try + { + ch.register(selector, SelectionKey.OP_CONNECT, req); + } + catch (IOException e) + { + req.setException(e); + } + } + } + + private void processSessions(Set keys) + { + Iterator it = keys.iterator(); + + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + + if (!key.isConnectable()) + { + continue; + } + + SocketChannel ch = (SocketChannel) key.channel(); + ConnectionRequest entry = (ConnectionRequest) key.attachment(); + + boolean success = false; + try + { + ch.finishConnect(); + SocketSessionImpl session = newSession(ch, entry.handler, entry.config); + entry.setSession(session); + success = true; + } + catch (Throwable e) + { + entry.setException(e); + } + finally + { + key.cancel(); + if (!success) + { + try + { + ch.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + } + } + } + + keys.clear(); + } + + private void processTimedOutSessions(Set keys) + { + long currentTime = System.currentTimeMillis(); + Iterator it = keys.iterator(); + + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + + if (!key.isValid()) + { + continue; + } + + ConnectionRequest entry = (ConnectionRequest) key.attachment(); + + if (currentTime >= entry.deadline) + { + entry.setException(new ConnectException()); + key.cancel(); + } + } + } + + private SocketSessionImpl newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config) throws IOException + { + SocketSessionImpl session = new SocketSessionImpl( + wrapper, managedSessions, + config.getSessionConfig(), + ch, handler, ch.socket().getRemoteSocketAddress()); + try + { + getFilterChainBuilder().buildFilterChain(session.getFilterChain()); + config.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); + ((SocketFilterChain) session.getFilterChain()).sessionCreated(session); + } + catch (Throwable e) + { + throw (IOException) new IOException("Failed to create a session.").initCause(e); + } + session.getManagedSessions().add(session); + session.getIoProcessor().addNew(session); + return session; + } + + private class Worker extends Thread + { + public Worker() + { + super(SocketConnectorDelegate.this.threadName); + } + + public void run() + { + for (; ;) + { + try + { + int nKeys = selector.select(1000); + + registerNew(); + + if (nKeys > 0) + { + processSessions(selector.selectedKeys()); + } + + processTimedOutSessions(selector.keys()); + + if (selector.keys().isEmpty()) + { + synchronized (SocketConnectorDelegate.this) + { + if (selector.keys().isEmpty() && + connectQueue.isEmpty()) + { + worker = null; + try + { + selector.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + selector = null; + } + break; + } + } + } + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + + try + { + Thread.sleep(1000); + } + catch (InterruptedException e1) + { + } + } + } + } + } + + private class ConnectionRequest extends DefaultConnectFuture + { + private final SocketChannel channel; + private final long deadline; + private final IoHandler handler; + private final IoServiceConfig config; + + private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config) + { + this.channel = channel; + long timeout; + if (config instanceof IoConnectorConfig) + { + timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis(); + } + else + { + timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis(); + } + this.deadline = System.currentTimeMillis() + timeout; + this.handler = handler; + this.config = config; + } + } +}
\ No newline at end of file diff --git a/java/common/src/org/apache/qpid/nio/SocketFilterChain.java b/java/common/src/org/apache/qpid/nio/SocketFilterChain.java new file mode 100644 index 0000000000..07143512b5 --- /dev/null +++ b/java/common/src/org/apache/qpid/nio/SocketFilterChain.java @@ -0,0 +1,48 @@ +package org.apache.qpid.nio; + +import java.io.IOException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.common.support.AbstractIoFilterChain; +import org.apache.mina.util.Queue; + +/** + * An {@link IoFilterChain} for socket transport (TCP/IP). + * + * @author The Apache Directory Project + */ +class SocketFilterChain extends AbstractIoFilterChain { + + public SocketFilterChain( IoSession parent ) + { + super( parent ); + } + + protected void doWrite( IoSession session, WriteRequest writeRequest ) + { + SocketSessionImpl s = ( SocketSessionImpl ) session; + Queue writeRequestQueue = s.getWriteRequestQueue(); + + // SocketIoProcessor.doFlush() will reset it after write is finished + // because the buffer will be passed with messageSent event. + ( ( ByteBuffer ) writeRequest.getMessage() ).mark(); + synchronized( writeRequestQueue ) + { + writeRequestQueue.push( writeRequest ); + if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() ) + { + // Notify SocketIoProcessor only when writeRequestQueue was empty. + s.getIoProcessor().flush( s ); + } + } + } + + protected void doClose( IoSession session ) throws IOException + { + SocketSessionImpl s = ( SocketSessionImpl ) session; + s.getIoProcessor().remove( s ); + } +} diff --git a/java/common/src/org/apache/qpid/nio/SocketIoProcessor.java b/java/common/src/org/apache/qpid/nio/SocketIoProcessor.java new file mode 100644 index 0000000000..d20ab41b96 --- /dev/null +++ b/java/common/src/org/apache/qpid/nio/SocketIoProcessor.java @@ -0,0 +1,770 @@ +/* + * @(#) $Id: SocketIoProcessor.java 372449 2006-01-26 05:24:58Z trustin $ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.nio; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.common.WriteTimeoutException; +import org.apache.mina.util.Queue; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Set; + +/** + * Performs all I/O operations for sockets which is connected or bound. + * This class is used by MINA internally. + * + * @author The Apache Directory Project (dev@directory.apache.org) + * @version $Rev: 372449 $, $Date: 2006-01-26 05:24:58 +0000 (Thu, 26 Jan 2006) $, + */ +class SocketIoProcessor +{ + private static final Logger _logger = Logger.getLogger(SocketIoProcessor.class); + + private static final String PROCESSORS_PROPERTY = "mina.socket.processors"; + private static final String THREAD_PREFIX = "SocketIoProcessor-"; + private static final int DEFAULT_PROCESSORS = 1; + private static final int PROCESSOR_COUNT; + private static final SocketIoProcessor[] PROCESSORS; + + private static int nextId; + + static + { + PROCESSOR_COUNT = configureProcessorCount(); + PROCESSORS = createProcessors(); + } + + /** + * Returns the {@link SocketIoProcessor} to be used for a newly + * created session + * + * @return The processor to be employed + */ + static synchronized SocketIoProcessor getInstance() + { + SocketIoProcessor processor = PROCESSORS[nextId ++]; + nextId %= PROCESSOR_COUNT; + return processor; + } + + private final String threadName; + private Selector selector; + + private final Queue newSessions = new Queue(); + private final Queue removingSessions = new Queue(); + private final Queue flushingSessions = new Queue(); + private final Queue trafficControllingSessions = new Queue(); + + private Worker worker; + private long lastIdleCheckTime = System.currentTimeMillis(); + + private SocketIoProcessor(String threadName) + { + this.threadName = threadName; + } + + void addNew(SocketSessionImpl session) throws IOException + { + synchronized (this) + { + synchronized (newSessions) + { + newSessions.push(session); + } + startupWorker(); + } + + selector.wakeup(); + } + + void remove(SocketSessionImpl session) throws IOException + { + scheduleRemove(session); + startupWorker(); + selector.wakeup(); + } + + private synchronized void startupWorker() throws IOException + { + if (worker == null) + { + selector = Selector.open(); + worker = new Worker(); + worker.start(); + } + } + + void flush(SocketSessionImpl session) + { + scheduleFlush(session); + Selector selector = this.selector; + if (selector != null) + { + selector.wakeup(); + } + } + + void updateTrafficMask(SocketSessionImpl session) + { + scheduleTrafficControl(session); + Selector selector = this.selector; + if (selector != null) + { + selector.wakeup(); + } + } + + private void scheduleRemove(SocketSessionImpl session) + { + synchronized (removingSessions) + { + removingSessions.push(session); + } + } + + private void scheduleFlush(SocketSessionImpl session) + { + synchronized (flushingSessions) + { + flushingSessions.push(session); + } + } + + private void scheduleTrafficControl(SocketSessionImpl session) + { + synchronized (trafficControllingSessions) + { + trafficControllingSessions.push(session); + } + } + + private void doAddNew() + { + if (newSessions.isEmpty()) + { + return; + } + + SocketSessionImpl session; + + for (; ;) + { + synchronized (newSessions) + { + session = (SocketSessionImpl) newSessions.pop(); + } + + if (session == null) + { + break; + } + + SocketChannel ch = session.getChannel(); + boolean registered; + + try + { + ch.configureBlocking(false); + session.setSelectionKey(ch.register(selector, + SelectionKey.OP_READ, + session)); + registered = true; + } + catch (IOException e) + { + session.getManagedSessions().remove(session); + registered = false; + ((SocketFilterChain) session.getFilterChain()).exceptionCaught(session, e); + } + + if (registered) + { + ((SocketFilterChain) session.getFilterChain()).sessionOpened(session); + } + } + } + + private void doRemove() + { + if (removingSessions.isEmpty()) + { + return; + } + + for (; ;) + { + SocketSessionImpl session; + + synchronized (removingSessions) + { + session = (SocketSessionImpl) removingSessions.pop(); + } + + if (session == null) + { + break; + } + + SocketChannel ch = session.getChannel(); + SelectionKey key = session.getSelectionKey(); + // Retry later if session is not yet fully initialized. + // (In case that Session.close() is called before addSession() is processed) + if (key == null) + { + scheduleRemove(session); + break; + } + // skip if channel is already closed + if (!key.isValid()) + { + continue; + } + + try + { + key.cancel(); + ch.close(); + } + catch (IOException e) + { + ((SocketFilterChain) session.getFilterChain()).exceptionCaught(session, e); + } + finally + { + releaseWriteBuffers(session); + session.getManagedSessions().remove(session); + + ((SocketFilterChain) session.getFilterChain()).sessionClosed(session); + session.getCloseFuture().setClosed(); + } + } + } + + private void process(Set selectedKeys) + { + Iterator it = selectedKeys.iterator(); + + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + SocketSessionImpl session = (SocketSessionImpl) key.attachment(); + + if (key.isReadable() && session.getTrafficMask().isReadable()) + { + read(session); + } + + if (key.isWritable() && session.getTrafficMask().isWritable()) + { + scheduleFlush(session); + } + } + + selectedKeys.clear(); + } + + private void read(SocketSessionImpl session) + { + ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize()); + SocketChannel ch = session.getChannel(); + + try + { + int readBytes = 0; + int ret; + + buf.clear(); + + try + { + while ((ret = ch.read(buf.buf())) > 0) + { + readBytes += ret; + } + } + finally + { + buf.flip(); + } + + session.increaseReadBytes(readBytes); + + if (readBytes > 0) + { + /*ByteBuffer newBuf = ByteBuffer.allocate(readBytes); + newBuf.put(buf); + newBuf.flip();*/ + //((SocketFilterChain) session.getFilterChain()).messageReceived(session, newBuf); + ((SocketFilterChain) session.getFilterChain()).messageReceived(session, buf); + } + if (ret < 0) + { + scheduleRemove(session); + } + } + catch (Throwable e) + { + if (e instanceof IOException) + { + scheduleRemove(session); + } + buf.release(); + ((SocketFilterChain) session.getFilterChain()).exceptionCaught(session, e); + } + /*finally + { + buf.release(); + } */ + } + + private void notifyIdleness() + { + // process idle sessions + long currentTime = System.currentTimeMillis(); + if ((currentTime - lastIdleCheckTime) >= 1000) + { + lastIdleCheckTime = currentTime; + Set keys = selector.keys(); + if (keys != null) + { + for (Iterator it = keys.iterator(); it.hasNext();) + { + SelectionKey key = (SelectionKey) it.next(); + SocketSessionImpl session = (SocketSessionImpl) key.attachment(); + notifyIdleness(session, currentTime); + } + } + } + } + + private void notifyIdleness(SocketSessionImpl session, long currentTime) + { + notifyIdleness0( + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), + IdleStatus.BOTH_IDLE, + Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); + notifyIdleness0( + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.READER_IDLE), + IdleStatus.READER_IDLE, + Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); + notifyIdleness0( + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE), + IdleStatus.WRITER_IDLE, + Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); + + notifyWriteTimeout(session, currentTime, session + .getWriteTimeoutInMillis(), session.getLastWriteTime()); + } + + private void notifyIdleness0(SocketSessionImpl session, long currentTime, + long idleTime, IdleStatus status, + long lastIoTime) + { + if (idleTime > 0 && lastIoTime != 0 + && (currentTime - lastIoTime) >= idleTime) + { + session.increaseIdleCount(status); + ((SocketFilterChain) session.getFilterChain()).sessionIdle(session, status); + } + } + + private void notifyWriteTimeout(SocketSessionImpl session, + long currentTime, + long writeTimeout, long lastIoTime) + { + SelectionKey key = session.getSelectionKey(); + if (writeTimeout > 0 + && (currentTime - lastIoTime) >= writeTimeout + && key != null && key.isValid() + && (key.interestOps() & SelectionKey.OP_WRITE) != 0) + { + ((SocketFilterChain) session.getFilterChain()).exceptionCaught(session, new WriteTimeoutException()); + } + } + + private void doFlush() + { + if (flushingSessions.size() == 0) + { + return; + } + + for (; ;) + { + SocketSessionImpl session; + + synchronized (flushingSessions) + { + session = (SocketSessionImpl) flushingSessions.pop(); + } + + if (session == null) + { + break; + } + + if (!session.isConnected()) + { + releaseWriteBuffers(session); + continue; + } + + SelectionKey key = session.getSelectionKey(); + // Retry later if session is not yet fully initialized. + // (In case that Session.write() is called before addSession() is processed) + if (key == null) + { + scheduleFlush(session); + break; + } + // skip if channel is already closed + if (!key.isValid()) + { + continue; + } + + try + { + doFlush(session); + } + catch (IOException e) + { + scheduleRemove(session); + ((SocketFilterChain) session.getFilterChain()).exceptionCaught(session, e); + } + } + } + + private void releaseWriteBuffers(SocketSessionImpl session) + { + Queue writeRequestQueue = session.getWriteRequestQueue(); + WriteRequest req; + + while ((req = (WriteRequest) writeRequestQueue.pop()) != null) + { + try + { + ((ByteBuffer) req.getMessage()).release(); + } + catch (IllegalStateException e) + { + ((SocketFilterChain) session.getFilterChain()).exceptionCaught(session, e); + } + finally + { + req.getFuture().setWritten(false); + } + } + } + + private void doFlush(SocketSessionImpl session) throws IOException + { + // Clear OP_WRITE + SelectionKey key = session.getSelectionKey(); + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + + SocketChannel ch = session.getChannel(); + Queue writeRequestQueue = session.getWriteRequestQueue(); + + WriteRequest req; + for (; ;) + { + synchronized (writeRequestQueue) + { + req = (WriteRequest) writeRequestQueue.first(); + } + + if (req == null) + { + break; + } + + ByteBuffer buf = (ByteBuffer) req.getMessage(); + if (buf.remaining() == 0) + { + synchronized (writeRequestQueue) + { + writeRequestQueue.pop(); + } + + session.increaseWrittenWriteRequests(); + buf.reset(); + ((SocketFilterChain) session.getFilterChain()).messageSent(session, req); + continue; + } + + int writtenBytes = ch.write(buf.buf()); + if (writtenBytes > 0) + { + session.increaseWrittenBytes(writtenBytes); + } + + if (buf.hasRemaining()) + { + //_logger.info("Kernel buf full"); + // Kernel buffer is full + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + selector.wakeup(); + break; + } + } + } + + private void doUpdateTrafficMask() + { + if (trafficControllingSessions.isEmpty()) + { + return; + } + + for (; ;) + { + SocketSessionImpl session; + + synchronized (trafficControllingSessions) + { + session = (SocketSessionImpl) trafficControllingSessions.pop(); + } + + if (session == null) + { + break; + } + + SelectionKey key = session.getSelectionKey(); + // Retry later if session is not yet fully initialized. + // (In case that Session.suspend??() or session.resume??() is + // called before addSession() is processed) + if (key == null) + { + scheduleTrafficControl(session); + break; + } + // skip if channel is already closed + if (!key.isValid()) + { + continue; + } + + // The normal is OP_READ and, if there are write requests in the + // session's write queue, set OP_WRITE to trigger flushing. + int ops = SelectionKey.OP_READ; + Queue writeRequestQueue = session.getWriteRequestQueue(); + synchronized (writeRequestQueue) + { + if (!writeRequestQueue.isEmpty()) + { + ops |= SelectionKey.OP_WRITE; + } + } + + // Now mask the preferred ops with the mask of the current session + int mask = session.getTrafficMask().getInterestOps(); + key.interestOps(ops & mask); + } + } + + /** + * Configures the number of processors employed. + * We first check for a system property "mina.IoProcessors". If this + * property is present and can be interpreted as an integer value greater + * or equal to 1, this value is used as the number of processors. + * Otherwise a default of 1 processor is employed. + * + * @return The nubmer of processors to employ + */ + private static int configureProcessorCount() + { + int processors = DEFAULT_PROCESSORS; + String processorProperty = System.getProperty(PROCESSORS_PROPERTY); + if (processorProperty != null) + { + try + { + processors = Integer.parseInt(processorProperty); + } + catch (NumberFormatException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + processors = Math.max(processors, 1); + + System.setProperty(PROCESSORS_PROPERTY, String.valueOf(processors)); + } + + return processors; + } + + private static SocketIoProcessor[] createProcessors() + { + SocketIoProcessor[] processors = new SocketIoProcessor[ PROCESSOR_COUNT ]; + for (int i = 0; i < PROCESSOR_COUNT; i ++) + { + processors[i] = new SocketIoProcessor(THREAD_PREFIX + i); + } + return processors; + } + + private class WorkerFlusher implements Runnable + { + private volatile boolean _shutdown = false; + + private volatile boolean _sleep = false; + + private final Object _lock = new Object(); + + public void run() + { + while (!_shutdown) + { + doFlush(); + try + { + sleep(); + } + catch (InterruptedException e) + { + // IGNORE + } + } + _logger.info("Flusher shutting down"); + } + + private void sleep() throws InterruptedException + { + synchronized (_lock) + { + while (_sleep && !_shutdown) + { + _logger.debug("Flusher going to sleep"); + _lock.wait(); + } + _sleep = true; + } + } + + void wakeup() + { + synchronized (_lock) + { + if (_sleep) + { + _logger.debug("Waking up flusher"); + _sleep = false; + _lock.notify(); + } + } + } + + void shutdown() + { + _shutdown = true; + wakeup(); + } + } + + private class Worker extends Thread + { + private WorkerFlusher _flusher; + + public Worker() + { + super(SocketIoProcessor.this.threadName); + _flusher = new WorkerFlusher(); + new Thread(_flusher, SocketIoProcessor.this.threadName + "Flusher").start(); + } + + public void run() + { + for (; ;) + { + try + { + int nKeys = selector.select(1000); + doAddNew(); + doUpdateTrafficMask(); + + if (nKeys > 0) + { + process(selector.selectedKeys()); + } + + //doFlush(); + // in case the flusher has gone to sleep we wake it up + if (flushingSessions.size() > 0) + { + _flusher.wakeup(); + } + doRemove(); + notifyIdleness(); + + if (selector.keys().isEmpty()) + { + synchronized (SocketIoProcessor.this) + { + if (selector.keys().isEmpty() && + newSessions.isEmpty()) + { + worker = null; + _flusher.shutdown(); + try + { + selector.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + selector = null; + } + break; + } + } + } + } + catch (Throwable t) + { + ExceptionMonitor.getInstance().exceptionCaught(t); + + try + { + Thread.sleep(1000); + } + catch (InterruptedException e1) + { + } + } + } + } + } + +} diff --git a/java/common/src/org/apache/qpid/nio/SocketSessionImpl.java b/java/common/src/org/apache/qpid/nio/SocketSessionImpl.java new file mode 100644 index 0000000000..f7c74f7a14 --- /dev/null +++ b/java/common/src/org/apache/qpid/nio/SocketSessionImpl.java @@ -0,0 +1,404 @@ +/* + * @(#) $Id: SocketSessionImpl.java 385247 2006-03-12 05:06:11Z trustin $ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.nio; + +import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.common.*; +import org.apache.mina.common.support.BaseIoSession; +import org.apache.mina.common.support.BaseIoSessionConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.mina.util.Queue; + +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.Set; + +/** + * An {@link IoSession} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (dev@directory.apache.org) + * @version $Rev: 385247 $, $Date: 2006-03-12 05:06:11 +0000 (Sun, 12 Mar 2006) $ + */ +class SocketSessionImpl extends BaseIoSession +{ + private final IoService manager; + private final SocketSessionConfig config = new SocketSessionConfigImpl(); + private final SocketIoProcessor ioProcessor; + private final SocketFilterChain filterChain; + private final SocketChannel ch; + private final Queue writeRequestQueue; + private final IoHandler handler; + private final SocketAddress remoteAddress; + private final SocketAddress localAddress; + private final SocketAddress serviceAddress; + private final Set managedSessions; + private SelectionKey key; + private int readBufferSize; + + /** + * Creates a new instance. + */ + public SocketSessionImpl( + IoService manager, Set managedSessions, + IoSessionConfig config, + SocketChannel ch, IoHandler defaultHandler, + SocketAddress serviceAddress ) + { + this.manager = manager; + this.managedSessions = managedSessions; + this.ioProcessor = SocketIoProcessor.getInstance(); + this.filterChain = new SocketFilterChain( this ); + this.ch = ch; + this.writeRequestQueue = new Queue(); + this.handler = defaultHandler; + this.remoteAddress = ch.socket().getRemoteSocketAddress(); + this.localAddress = ch.socket().getLocalSocketAddress(); + this.serviceAddress = serviceAddress; + + // Apply the initial session settings + if( config instanceof SocketSessionConfig ) + { + SocketSessionConfig cfg = ( SocketSessionConfig ) config; + this.config.setKeepAlive( cfg.isKeepAlive() ); + this.config.setOobInline( cfg.isOobInline() ); + this.config.setReceiveBufferSize( cfg.getReceiveBufferSize() ); + this.readBufferSize = cfg.getReceiveBufferSize(); + this.config.setReuseAddress( cfg.isReuseAddress() ); + this.config.setSendBufferSize( cfg.getSendBufferSize() ); + this.config.setSoLinger( cfg.getSoLinger() ); + this.config.setTcpNoDelay( cfg.isTcpNoDelay() ); + + if( this.config.getTrafficClass() != cfg.getTrafficClass() ) + { + this.config.setTrafficClass( cfg.getTrafficClass() ); + } + } + } + + public IoService getService() + { + return manager; + } + + public IoSessionConfig getConfig() + { + return config; + } + + SocketIoProcessor getIoProcessor() + { + return ioProcessor; + } + + public IoFilterChain getFilterChain() + { + return filterChain; + } + + SocketChannel getChannel() + { + return ch; + } + + Set getManagedSessions() + { + return managedSessions; + } + + SelectionKey getSelectionKey() + { + return key; + } + + void setSelectionKey( SelectionKey key ) + { + this.key = key; + } + + public IoHandler getHandler() + { + return handler; + } + + protected void close0() + { + filterChain.filterClose( this ); + } + + Queue getWriteRequestQueue() + { + return writeRequestQueue; + } + + public int getScheduledWriteRequests() + { + synchronized( writeRequestQueue ) + { + return writeRequestQueue.size(); + } + } + + public int getScheduledWriteBytes() + { + synchronized( writeRequestQueue ) + { + return writeRequestQueue.byteSize(); + } + } + + protected void write0( WriteRequest writeRequest ) + { + filterChain.filterWrite( this, writeRequest ); + } + + public TransportType getTransportType() + { + return TransportType.SOCKET; + } + + public SocketAddress getRemoteAddress() + { + return remoteAddress; + } + + public SocketAddress getLocalAddress() + { + return localAddress; + } + + public SocketAddress getServiceAddress() + { + return serviceAddress; + } + + protected void updateTrafficMask() + { + this.ioProcessor.updateTrafficMask( this ); + } + + int getReadBufferSize() + { + return readBufferSize; + } + + private class SocketSessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig + { + public boolean isKeepAlive() + { + try + { + return ch.socket().getKeepAlive(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setKeepAlive( boolean on ) + { + try + { + ch.socket().setKeepAlive( on ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public boolean isOobInline() + { + try + { + return ch.socket().getOOBInline(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setOobInline( boolean on ) + { + try + { + ch.socket().setOOBInline( on ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public boolean isReuseAddress() + { + try + { + return ch.socket().getReuseAddress(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setReuseAddress( boolean on ) + { + try + { + ch.socket().setReuseAddress( on ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public int getSoLinger() + { + try + { + return ch.socket().getSoLinger(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setSoLinger( int linger ) + { + try + { + if( linger < 0 ) + { + ch.socket().setSoLinger( false, 0 ); + } + else + { + ch.socket().setSoLinger( true, linger ); + } + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public boolean isTcpNoDelay() + { + try + { + return ch.socket().getTcpNoDelay(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setTcpNoDelay( boolean on ) + { + try + { + ch.socket().setTcpNoDelay( on ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public int getTrafficClass() + { + try + { + return ch.socket().getTrafficClass(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setTrafficClass( int tc ) + { + try + { + ch.socket().setTrafficClass( tc ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public int getSendBufferSize() + { + try + { + return ch.socket().getSendBufferSize(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setSendBufferSize( int size ) + { + try + { + ch.socket().setSendBufferSize( size ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public int getReceiveBufferSize() + { + try + { + return ch.socket().getReceiveBufferSize(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setReceiveBufferSize( int size ) + { + try + { + ch.socket().setReceiveBufferSize( size ); + SocketSessionImpl.this.readBufferSize = size; + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + } +} diff --git a/java/common/src/org/apache/qpid/pool/Event.java b/java/common/src/org/apache/qpid/pool/Event.java new file mode 100644 index 0000000000..6bf86ffc2e --- /dev/null +++ b/java/common/src/org/apache/qpid/pool/Event.java @@ -0,0 +1,111 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.pool; + +import org.apache.log4j.Logger; +import org.apache.mina.common.IoFilter; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IdleStatus; + +/** + * Represents an operation on IoFilter. + */ +enum EventType +{ + OPENED, CLOSED, READ, WRITE, WRITTEN, RECEIVED, SENT, IDLE, EXCEPTION +} + +class Event +{ + private static final Logger _log = Logger.getLogger(Event.class); + + private final EventType type; + private final IoFilter.NextFilter nextFilter; + private final Object data; + + public Event(IoFilter.NextFilter nextFilter, EventType type, Object data) + { + this.type = type; + this.nextFilter = nextFilter; + this.data = data; + if (type == EventType.EXCEPTION) + { + _log.error("Exception event constructed: " + data, (Exception) data); + } + } + + public Object getData() + { + return data; + } + + + public IoFilter.NextFilter getNextFilter() + { + return nextFilter; + } + + + public EventType getType() + { + return type; + } + + void process(IoSession session) + { + if (_log.isDebugEnabled()) + { + _log.debug("Processing " + this); + } + if (type == EventType.RECEIVED) + { + nextFilter.messageReceived(session, data); + //ByteBufferUtil.releaseIfPossible( data ); + } + else if (type == EventType.SENT) + { + nextFilter.messageSent(session, data); + //ByteBufferUtil.releaseIfPossible( data ); + } + else if (type == EventType.EXCEPTION) + { + nextFilter.exceptionCaught(session, (Throwable) data); + } + else if (type == EventType.IDLE) + { + nextFilter.sessionIdle(session, (IdleStatus) data); + } + else if (type == EventType.OPENED) + { + nextFilter.sessionOpened(session); + } + else if (type == EventType.WRITE) + { + nextFilter.filterWrite(session, (IoFilter.WriteRequest) data); + } + else if (type == EventType.CLOSED) + { + nextFilter.sessionClosed(session); + } + } + + public String toString() + { + return "Event: type " + type + ", data: " + data; + } +} diff --git a/java/common/src/org/apache/qpid/pool/Job.java b/java/common/src/org/apache/qpid/pool/Job.java new file mode 100644 index 0000000000..45a115bcd3 --- /dev/null +++ b/java/common/src/org/apache/qpid/pool/Job.java @@ -0,0 +1,110 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.pool; + +import org.apache.mina.common.IoSession; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Holds events for a session that will be processed asynchronously by + * the thread pool in PoolingFilter. + */ +class Job implements Runnable +{ + private final int _maxEvents; + private final IoSession _session; + private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>(); + private final AtomicBoolean _active = new AtomicBoolean(); + private final AtomicInteger _refCount = new AtomicInteger(); + private final JobCompletionHandler _completionHandler; + + Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents) + { + _session = session; + _completionHandler = completionHandler; + _maxEvents = maxEvents; + } + + void acquire() + { + _refCount.incrementAndGet(); + } + + void release() + { + _refCount.decrementAndGet(); + } + + boolean isReferenced() + { + return _refCount.get() > 0; + } + + void add(Event evt) + { + _eventQueue.add(evt); + } + + void processAll() + { + //limit the number of events processed in one run + for (int i = 0; i < _maxEvents; i++) + { + Event e = _eventQueue.poll(); + if (e == null) + { + break; + } + else + { + e.process(_session); + } + } + } + + boolean isComplete() + { + return _eventQueue.peek() == null; + } + + boolean activate() + { + return _active.compareAndSet(false, true); + } + + void deactivate() + { + _active.set(false); + } + + public void run() + { + processAll(); + deactivate(); + _completionHandler.completed(_session, this); + } + + + static interface JobCompletionHandler + { + public void completed(IoSession session, Job job); + } +} diff --git a/java/common/src/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/org/apache/qpid/pool/PoolingFilter.java new file mode 100644 index 0000000000..ecb0bd5048 --- /dev/null +++ b/java/common/src/org/apache/qpid/pool/PoolingFilter.java @@ -0,0 +1,183 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.pool; + +import org.apache.log4j.Logger; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoFilterAdapter; +import org.apache.mina.common.IoSession; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler +{ + private static final Logger _logger = Logger.getLogger(PoolingFilter.class); + public static final Set<EventType> READ_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.RECEIVED)); + public static final Set<EventType> WRITE_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.WRITE)); + + private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>(); + private final ReferenceCountingExecutorService _poolReference; + private final Set<EventType> _asyncTypes; + + private final String _name; + private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + + public PoolingFilter(ReferenceCountingExecutorService refCountingPool, Set<EventType> asyncTypes, String name) + { + _poolReference = refCountingPool; + _asyncTypes = asyncTypes; + _name = name; + } + + private void fireEvent(IoSession session, Event event) + { + if (_asyncTypes.contains(event.getType())) + { + Job job = getJobForSession(session); + job.acquire(); //prevents this job being removed from _jobs + job.add(event); + if (job.activate()) + { + _poolReference.getPool().execute(job); + } + } + else + { + event.process(session); + } + } + + private Job getJobForSession(IoSession session) + { + Job job = _jobs.get(session); + return job == null ? createJobForSession(session) : job; + } + + private Job createJobForSession(IoSession session) + { + return addJobForSession(session, new Job(session, this, _maxEvents)); + } + + private Job addJobForSession(IoSession session, Job job) + { + //atomic so ensures all threads agree on the same job + Job existing = _jobs.putIfAbsent(session, job); + return existing == null ? job : existing; + } + + //Job.JobCompletionHandler + public void completed(IoSession session, Job job) + { + if (job.isComplete()) + { + job.release(); + if (!job.isReferenced()) + { + _jobs.remove(session); + } + } + else + { + if (job.activate()) + { + _poolReference.getPool().execute(job); + } + } + } + + //IoFilter methods that are processed by threads on the pool + + public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception + { + fireEvent(session, new Event(nextFilter, EventType.OPENED, null)); + } + + public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception + { + fireEvent(session, new Event(nextFilter, EventType.CLOSED, null)); + } + + public void sessionIdle(NextFilter nextFilter, IoSession session, + IdleStatus status) throws Exception + { + fireEvent(session, new Event(nextFilter, EventType.IDLE, status)); + } + + public void exceptionCaught(NextFilter nextFilter, IoSession session, + Throwable cause) throws Exception + { + fireEvent(session, new Event(nextFilter, EventType.EXCEPTION, cause)); + } + + public void messageReceived(NextFilter nextFilter, IoSession session, + Object message) throws Exception + { + //ByteBufferUtil.acquireIfPossible( message ); + fireEvent(session, new Event(nextFilter, EventType.RECEIVED, message)); + } + + public void messageSent(NextFilter nextFilter, IoSession session, + Object message) throws Exception + { + //ByteBufferUtil.acquireIfPossible( message ); + fireEvent(session, new Event(nextFilter, EventType.SENT, message)); + } + + public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception + { + fireEvent(session, new Event(nextFilter, EventType.WRITE, writeRequest)); + } + + //IoFilter methods that are processed on current thread (NOT on pooled thread) + + public void filterClose(NextFilter nextFilter, IoSession session) throws Exception + { + nextFilter.filterClose(session); + } + + public void sessionCreated(NextFilter nextFilter, IoSession session) + { + nextFilter.sessionCreated(session); + } + + public String toString() + { + return _name; + } + + // LifeCycle methods + + public void init() + { + _logger.info("Init called on PoolingFilter " + toString()); + // called when the filter is initialised in the chain. If the reference count is + // zero this acquire will initialise the pool + _poolReference.acquireExecutorService(); + } + + public void destroy() + { + _logger.info("Destroy called on PoolingFilter " + toString()); + // when the reference count gets to zero we release the executor service + _poolReference.releaseExecutorService(); + } +} diff --git a/java/common/src/org/apache/qpid/pool/ReadWriteThreadModel.java b/java/common/src/org/apache/qpid/pool/ReadWriteThreadModel.java new file mode 100644 index 0000000000..5de2bf8d0e --- /dev/null +++ b/java/common/src/org/apache/qpid/pool/ReadWriteThreadModel.java @@ -0,0 +1,37 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.pool; + +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.ReferenceCountingIoFilter; +import org.apache.mina.common.ThreadModel; + +public class ReadWriteThreadModel implements ThreadModel +{ + public void buildFilterChain(IoFilterChain chain) throws Exception + { + ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance(); + PoolingFilter asyncRead = new PoolingFilter(executor, PoolingFilter.READ_EVENTS, + "AsynchronousReadFilter"); + PoolingFilter asyncWrite = new PoolingFilter(executor, PoolingFilter.WRITE_EVENTS, + "AsynchronousWriteFilter"); + + chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(asyncRead)); + chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(asyncWrite)); + } +} diff --git a/java/common/src/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/org/apache/qpid/pool/ReferenceCountingExecutorService.java new file mode 100644 index 0000000000..f048a12b90 --- /dev/null +++ b/java/common/src/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -0,0 +1,95 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.pool; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * We share the executor service among several PoolingFilters. This class reference counts + * how many filter chains are using the executor service and destroys the service, thus + * freeing up its threads, when the count reaches zero. It recreates the service when + * the count is incremented. + * + * This is particularly important on the client where failing to destroy the executor + * service prevents the JVM from shutting down due to the existence of non-daemon threads. + * + */ +public class ReferenceCountingExecutorService +{ + private static final int MINIMUM_POOL_SIZE = 4; + private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors(); + private static final int DEFAULT_POOL_SIZE = Math.max(NUM_CPUS, MINIMUM_POOL_SIZE); + + /** + * We need to be able to check the current reference count and if necessary + * create the executor service atomically. + */ + private static final ReferenceCountingExecutorService _instance = new ReferenceCountingExecutorService(); + + private final Object _lock = new Object(); + + private ExecutorService _pool; + + private int _refCount = 0; + + private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE); + + public static ReferenceCountingExecutorService getInstance() + { + return _instance; + } + + private ReferenceCountingExecutorService() + { + } + + ExecutorService acquireExecutorService() + { + synchronized (_lock) + { + if (_refCount++ == 0) + { + _pool = Executors.newFixedThreadPool(_poolSize); + } + return _pool; + } + } + + void releaseExecutorService() + { + synchronized (_lock) + { + if (--_refCount == 0) + { + _pool.shutdownNow(); + } + } + } + + /** + * The filters that use the executor service should call this method to get access + * to the service. Note that this method does not alter the reference count. + * + * @return the underlying executor service + */ + public ExecutorService getPool() + { + return _pool; + } +} diff --git a/java/common/src/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/org/apache/qpid/protocol/AMQConstant.java new file mode 100644 index 0000000000..0716104688 --- /dev/null +++ b/java/common/src/org/apache/qpid/protocol/AMQConstant.java @@ -0,0 +1,105 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.protocol; + +import java.util.Map; +import java.util.HashMap; + +public final class AMQConstant +{ + private int _code; + + private String _name; + + private static Map _codeMap = new HashMap(); + + private AMQConstant(int code, String name, boolean map) + { + _code = code; + _name = name; + if (map) + { + _codeMap.put(new Integer(code), this); + } + } + + public String toString() + { + return _code + ": " + _name; + } + + public int getCode() + { + return _code; + } + + public String getName() + { + return _name; + } + + public static final AMQConstant FRAME_MIN_SIZE = new AMQConstant(4096, "frame min size", true); + + public static final AMQConstant FRAME_END = new AMQConstant(206, "frame end", true); + + public static final AMQConstant REPLY_SUCCESS = new AMQConstant(200, "reply success", true); + + public static final AMQConstant NOT_DELIVERED = new AMQConstant(310, "not delivered", true); + + public static final AMQConstant MESSAGE_TOO_LARGE = new AMQConstant(311, "message too large", true); + + public static final AMQConstant NO_ROUTE = new AMQConstant(312, "no route", true); + + public static final AMQConstant NO_CONSUMERS = new AMQConstant(313, "no consumers", true); + + public static final AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true); + + public static final AMQConstant CONTEXT_UNKNOWN = new AMQConstant(321, "context unknown", true); + + public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true); + + public static final AMQConstant ACCESS_REFUSED = new AMQConstant(403, "access refused", true); + + public static final AMQConstant NOT_FOUND = new AMQConstant(404, "not found", true); + + public static final AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error", true); + + public static final AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error", true); + + public static final AMQConstant COMMAND_INVALID = new AMQConstant(503, "command invalid", true); + + public static final AMQConstant CHANNEL_ERROR = new AMQConstant(504, "channel error", true); + + public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true); + + public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true); + + public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true); + + public static final AMQConstant INTERNAL_ERROR = new AMQConstant(541, "internal error", true); + + public static AMQConstant getConstant(int code) + { + AMQConstant c = (AMQConstant) _codeMap.get(new Integer(code)); + if (c == null) + { + c = new AMQConstant(code, "unknown code", false); + } + return c; + } +} diff --git a/java/common/src/org/apache/qpid/ssl/BogusSSLContextFactory.java b/java/common/src/org/apache/qpid/ssl/BogusSSLContextFactory.java new file mode 100644 index 0000000000..c066fd0370 --- /dev/null +++ b/java/common/src/org/apache/qpid/ssl/BogusSSLContextFactory.java @@ -0,0 +1,156 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.ssl; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.io.InputStream; +import java.security.GeneralSecurityException; +import java.security.KeyStore; + +/** + * Factory to create a bogus SSLContext. This means that it is easy to test SSL but this + * cannot be used in a production environment. + * <p/> + * This is based on the sample that comes with MINA, written by Trustin Lee + */ +public class BogusSSLContextFactory +{ + /** + * Protocol to use. + */ + private static final String PROTOCOL = "TLS"; + + /** + * Bougus Server certificate keystore file name. + */ + private static final String BOGUS_KEYSTORE = "qpid.cert"; + + // NOTE: The keystore was generated using keytool: + // keytool -genkey -alias qpid -keysize 512 -validity 3650 + // -keyalg RSA -dname "CN=amqp.org" -keypass qpidpw + // -storepass qpidpw -keystore qpid.cert + + private static final char[] BOGUS_KEYSTORE_PASSWORD = {'q', 'p', 'i', 'd', 'p', 'w'}; + + private static SSLContext serverInstance = null; + + private static SSLContext clientInstance = null; + + /** + * Get SSLContext singleton. + * + * @return SSLContext + * @throws java.security.GeneralSecurityException + */ + public static SSLContext getInstance(boolean server) + throws GeneralSecurityException + { + SSLContext retInstance; + if (server) + { + // FIXME: looks like double-checking locking + if (serverInstance == null) + { + synchronized (BogusSSLContextFactory.class) + { + if (serverInstance == null) + { + try + { + serverInstance = createBougusServerSSLContext(); + } + catch (Exception ioe) + { + throw new GeneralSecurityException( + "Can't create Server SSLContext:" + ioe); + } + } + } + } + retInstance = serverInstance; + } + else + { + // FIXME: looks like double-checking locking + if (clientInstance == null) + { + synchronized (BogusSSLContextFactory.class) + { + if (clientInstance == null) + { + clientInstance = createBougusClientSSLContext(); + } + } + } + retInstance = clientInstance; + } + return retInstance; + } + + private static SSLContext createBougusServerSSLContext() + throws GeneralSecurityException, IOException + { + // Create keystore + KeyStore ks = KeyStore.getInstance("JKS"); + InputStream in = null; + try + { + in = BogusSSLContextFactory.class.getResourceAsStream(BOGUS_KEYSTORE); + if (in == null) + { + throw new IOException("Unable to load keystore resource: " + BOGUS_KEYSTORE); + } + ks.load(in, BOGUS_KEYSTORE_PASSWORD); + } + finally + { + if (in != null) + { + //noinspection EmptyCatchBlock + try + { + in.close(); + } + catch (IOException ignored) + { + } + } + } + + // Set up key manager factory to use our key store + KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); + kmf.init(ks, BOGUS_KEYSTORE_PASSWORD); + + // Initialize the SSLContext to work with our key managers. + SSLContext sslContext = SSLContext.getInstance(PROTOCOL); + sslContext.init(kmf.getKeyManagers(), BogusTrustManagerFactory.X509_MANAGERS, null); + + return sslContext; + } + + private static SSLContext createBougusClientSSLContext() + throws GeneralSecurityException + { + SSLContext context = SSLContext.getInstance(PROTOCOL); + context.init(null, BogusTrustManagerFactory.X509_MANAGERS, null); + return context; + } + +} diff --git a/java/common/src/org/apache/qpid/ssl/BogusTrustManagerFactory.java b/java/common/src/org/apache/qpid/ssl/BogusTrustManagerFactory.java new file mode 100644 index 0000000000..8a71e3d7c8 --- /dev/null +++ b/java/common/src/org/apache/qpid/ssl/BogusTrustManagerFactory.java @@ -0,0 +1,79 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.ssl; + +import javax.net.ssl.ManagerFactoryParameters; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactorySpi; +import javax.net.ssl.X509TrustManager; +import java.security.InvalidAlgorithmParameterException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; + +/** + * Bogus trust manager factory. Used to make testing SSL simpler - i.e no need to + * mess about with keystores. + * <p/> + * This is based on the example that comes with MINA, written by Trustin Lee. + */ +class BogusTrustManagerFactory extends TrustManagerFactorySpi +{ + + static final X509TrustManager X509 = new X509TrustManager() + { + public void checkClientTrusted(X509Certificate[] x509Certificates, + String s) throws CertificateException + { + } + + public void checkServerTrusted(X509Certificate[] x509Certificates, + String s) throws CertificateException + { + } + + public X509Certificate[] getAcceptedIssuers() + { + return new X509Certificate[ 0 ]; + } + }; + + static final TrustManager[] X509_MANAGERS = new TrustManager[]{X509}; + + public BogusTrustManagerFactory() + { + } + + protected TrustManager[] engineGetTrustManagers() + { + return X509_MANAGERS; + } + + protected void engineInit(KeyStore keystore) throws KeyStoreException + { + // noop + } + + protected void engineInit( + ManagerFactoryParameters managerFactoryParameters) + throws InvalidAlgorithmParameterException + { + // noop + } +} diff --git a/java/common/src/org/apache/qpid/ssl/SSLServerSocketFactory.java b/java/common/src/org/apache/qpid/ssl/SSLServerSocketFactory.java new file mode 100644 index 0000000000..f2d5396637 --- /dev/null +++ b/java/common/src/org/apache/qpid/ssl/SSLServerSocketFactory.java @@ -0,0 +1,105 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.ssl; + +import javax.net.ServerSocketFactory; +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.security.GeneralSecurityException; + +/** + * Simple Server Socket factory to create sockets with or without SSL enabled. + * If SSL enabled a "bogus" SSL Context is used (suitable for test purposes) + * <p/> + * This is based on the example that comes with MINA, written by Trustin Lee. + */ +public class SSLServerSocketFactory extends javax.net.ServerSocketFactory +{ + private static boolean sslEnabled = false; + + private static javax.net.ServerSocketFactory sslFactory = null; + + private static ServerSocketFactory factory = null; + + public SSLServerSocketFactory() + { + super(); + } + + public ServerSocket createServerSocket(int port) throws IOException + { + return new ServerSocket(port); + } + + public ServerSocket createServerSocket(int port, int backlog) + throws IOException + { + return new ServerSocket(port, backlog); + } + + public ServerSocket createServerSocket(int port, int backlog, + InetAddress ifAddress) + throws IOException + { + return new ServerSocket(port, backlog, ifAddress); + } + + public static javax.net.ServerSocketFactory getServerSocketFactory() + throws IOException + { + if (isSslEnabled()) + { + if (sslFactory == null) + { + try + { + sslFactory = BogusSSLContextFactory.getInstance(true) + .getServerSocketFactory(); + } + catch (GeneralSecurityException e) + { + IOException ioe = new IOException( + "could not create SSL socket"); + ioe.initCause(e); + throw ioe; + } + } + return sslFactory; + } + else + { + if (factory == null) + { + factory = new SSLServerSocketFactory(); + } + return factory; + } + + } + + public static boolean isSslEnabled() + { + return sslEnabled; + } + + public static void setSslEnabled(boolean newSslEnabled) + { + sslEnabled = newSslEnabled; + } +} diff --git a/java/common/src/org/apache/qpid/ssl/SSLSocketFactory.java b/java/common/src/org/apache/qpid/ssl/SSLSocketFactory.java new file mode 100644 index 0000000000..31dccb593e --- /dev/null +++ b/java/common/src/org/apache/qpid/ssl/SSLSocketFactory.java @@ -0,0 +1,135 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.ssl; + +import javax.net.SocketFactory; +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; +import java.net.UnknownHostException; +import java.security.GeneralSecurityException; + +/** + * Simple Socket factory to create sockets with or without SSL enabled. + * If SSL enabled a "bogus" SSL Context is used (suitable for test purposes). + * <p/> + * This is based on an example that comes with MINA, written by Trustin Lee. + */ +public class SSLSocketFactory extends SocketFactory +{ + private static boolean sslEnabled = false; + + private static javax.net.ssl.SSLSocketFactory sslFactory = null; + + private static javax.net.SocketFactory factory = null; + + public SSLSocketFactory() + { + super(); + } + + public Socket createSocket(String arg1, int arg2) throws IOException, + UnknownHostException + { + if (isSslEnabled()) + { + return getSSLFactory().createSocket(arg1, arg2); + } + else + { + return new Socket(arg1, arg2); + } + } + + public Socket createSocket(String arg1, int arg2, InetAddress arg3, + int arg4) throws IOException, + UnknownHostException + { + if (isSslEnabled()) + { + return getSSLFactory().createSocket(arg1, arg2, arg3, arg4); + } + else + { + return new Socket(arg1, arg2, arg3, arg4); + } + } + + public Socket createSocket(InetAddress arg1, int arg2) + throws IOException + { + if (isSslEnabled()) + { + return getSSLFactory().createSocket(arg1, arg2); + } + else + { + return new Socket(arg1, arg2); + } + } + + public Socket createSocket(InetAddress arg1, int arg2, InetAddress arg3, + int arg4) throws IOException + { + if (isSslEnabled()) + { + return getSSLFactory().createSocket(arg1, arg2, arg3, arg4); + } + else + { + return new Socket(arg1, arg2, arg3, arg4); + } + } + + public static javax.net.SocketFactory getSocketFactory() + { + if (factory == null) + { + factory = new SSLSocketFactory(); + } + return factory; + } + + private javax.net.ssl.SSLSocketFactory getSSLFactory() + { + if (sslFactory == null) + { + try + { + sslFactory = BogusSSLContextFactory.getInstance(false) + .getSocketFactory(); + } + catch (GeneralSecurityException e) + { + throw new RuntimeException("could not create SSL socket", e); + } + } + return sslFactory; + } + + public static boolean isSslEnabled() + { + return sslEnabled; + } + + public static void setSslEnabled(boolean newSslEnabled) + { + sslEnabled = newSslEnabled; + } + +} diff --git a/java/common/src/org/apache/qpid/url/AMQBindingURL.java b/java/common/src/org/apache/qpid/url/AMQBindingURL.java new file mode 100644 index 0000000000..5ea1a55f2a --- /dev/null +++ b/java/common/src/org/apache/qpid/url/AMQBindingURL.java @@ -0,0 +1,260 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.url; + +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.exchange.ExchangeDefaults; + +import java.util.HashMap; +import java.net.URI; +import java.net.URISyntaxException; + +public class AMQBindingURL implements BindingURL +{ + String _url; + String _exchangeClass; + String _exchangeName; + String _destinationName; + String _queueName; + private HashMap<String, String> _options; + + + public AMQBindingURL(String url) throws URLSyntaxException + { + //format: + // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* + + _url = url; + _options = new HashMap<String, String>(); + + parseBindingURL(); + } + + private void parseBindingURL() throws URLSyntaxException + { + try + { + URI connection = new URI(_url); + + String exchangeClass = connection.getScheme(); + + if (exchangeClass == null) + { + _url = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + "://" + + ExchangeDefaults.DIRECT_EXCHANGE_NAME + "//" + _url; + //URLHelper.parseError(-1, "Exchange Class not specified.", _url); + parseBindingURL(); + return; + } + else + { + setExchangeClass(exchangeClass); + } + + String exchangeName = connection.getHost(); + + if (exchangeName == null) + { + URLHelper.parseError(-1, "Exchange Name not specified.", _url); + } + else + { + setExchangeName(exchangeName); + } + + if (connection.getPath() == null || + connection.getPath().equals("")) + { + URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(), + "Destination or Queue requried", _url); + } + else + { + int slash = connection.getPath().indexOf("/", 1); + if (slash == -1) + { + URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(), + "Destination requried", _url); + } + else + { + String path = connection.getPath(); + setDestinationName(path.substring(1, slash)); + + setQueueName(path.substring(slash + 1)); + + } + } + + URLHelper.parseOptions(_options, connection.getQuery()); + + processOptions(); + + //Fragment is #string (not used) + //System.out.println(connection.getFragment()); + + } + catch (URISyntaxException uris) + { + + URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); + + } + } + + private void processOptions() + { + //this is where we would parse any options that needed more than just storage. + } + + public String getURL() + { + return _url; + } + + public String getExchangeClass() + { + return _exchangeClass; + } + + public void setExchangeClass(String exchangeClass) + { + _exchangeClass = exchangeClass; + } + + public String getExchangeName() + { + return _exchangeName; + } + + public void setExchangeName(String name) + { + _exchangeName = name; + + if (name.equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME)) + { + setOption(BindingURL.OPTION_EXCLUSIVE, "true"); + } + } + + public String getDestinationName() + { + return _destinationName; + } + + public void setDestinationName(String name) + { + _destinationName = name; + } + + public String getQueueName() + { + if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) + { + if (Boolean.parseBoolean(getOption(OPTION_DURABLE))) + { + if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION)) + { + return getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION); + } + else + { + return getDestinationName(); + } + } + else + { + return getDestinationName(); + } + } + else + { + return _queueName; + } + } + + public void setQueueName(String name) + { + _queueName = name; + } + + public String getOption(String key) + { + return _options.get(key); + } + + public void setOption(String key, String value) + { + _options.put(key, value); + } + + public boolean containsOption(String key) + { + return _options.containsKey(key); + } + + public String getRoutingKey() + { + if (_exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) + { + return getQueueName(); + } + + if (containsOption(BindingURL.OPTION_ROUTING_KEY)) + { + return getOption(OPTION_ROUTING_KEY); + } + + return getDestinationName(); + } + + public void setRoutingKey(String key) + { + setOption(OPTION_ROUTING_KEY, key); + } + + + public String toString() + { + StringBuffer sb = new StringBuffer(); + + sb.append(_exchangeClass); + sb.append("://"); + sb.append(_exchangeName); + sb.append('/'); + sb.append(_destinationName); + sb.append('/'); + sb.append(_queueName); + + sb.append(URLHelper.printOptions(_options)); + return sb.toString(); + } + + public static void main(String args[]) throws URLSyntaxException + { + String url = "exchangeClass://exchangeName/Destination/Queue?option='value',option2='value2'"; + + AMQBindingURL dest = new AMQBindingURL(url); + + System.out.println(url); + System.out.println(dest); + + } + +}
\ No newline at end of file diff --git a/java/common/src/org/apache/qpid/url/BindingURL.java b/java/common/src/org/apache/qpid/url/BindingURL.java new file mode 100644 index 0000000000..77802b0e17 --- /dev/null +++ b/java/common/src/org/apache/qpid/url/BindingURL.java @@ -0,0 +1,65 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.url; + +import java.util.List; + +/* + Binding URL format: + <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* +*/ +public interface BindingURL +{ + public static final String OPTION_EXCLUSIVE = "exclusive"; + public static final String OPTION_AUTODELETE = "autodelete"; + public static final String OPTION_DURABLE = "durable"; + public static final String OPTION_CLIENTID = "clientid"; + public static final String OPTION_SUBSCRIPTION = "subscription"; + public static final String OPTION_ROUTING_KEY = "routingkey"; + + + String getURL(); + + String getExchangeClass(); + + void setExchangeClass(String exchangeClass); + + String getExchangeName(); + + void setExchangeName(String name); + + String getDestinationName(); + + void setDestinationName(String name); + + String getQueueName(); + + void setQueueName(String name); + + String getOption(String key); + + void setOption(String key, String value); + + boolean containsOption(String key); + + String getRoutingKey(); + + void setRoutingKey(String key); + + String toString(); +} diff --git a/java/common/src/org/apache/qpid/url/URLHelper.java b/java/common/src/org/apache/qpid/url/URLHelper.java new file mode 100644 index 0000000000..959735d438 --- /dev/null +++ b/java/common/src/org/apache/qpid/url/URLHelper.java @@ -0,0 +1,173 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.url; + +import java.util.HashMap; + +public class URLHelper +{ + public static char DEFAULT_OPTION_SEPERATOR = '&'; + public static char ALTERNATIVE_OPTION_SEPARATOR = ','; + public static char BROKER_SEPARATOR = ';'; + + public static void parseOptions(HashMap<String, String> optionMap, String options) throws URLSyntaxException + { + //options looks like this + //brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value'' + + if (options == null || options.indexOf('=') == -1) + { + return; + } + + int optionIndex = options.indexOf('='); + + String option = options.substring(0, optionIndex); + + int length = options.length(); + + int nestedQuotes = 0; + + // to store index of final "'" + int valueIndex = optionIndex; + + //Walk remainder of url. + while (nestedQuotes > 0 || valueIndex < length) + { + valueIndex++; + + if (valueIndex >= length) + { + break; + } + + if (options.charAt(valueIndex) == '\'') + { + if (valueIndex + 1 < options.length()) + { + if (options.charAt(valueIndex + 1) == DEFAULT_OPTION_SEPERATOR || + options.charAt(valueIndex + 1) == ALTERNATIVE_OPTION_SEPARATOR || + options.charAt(valueIndex + 1) == BROKER_SEPARATOR || + options.charAt(valueIndex + 1) == '\'') + { + nestedQuotes--; +// System.out.println( +// options + "\n" + "-" + nestedQuotes + ":" + getPositionString(valueIndex - 2, 1)); + if (nestedQuotes == 0) + { + //We've found the value of an option + break; + } + } + else + { + nestedQuotes++; +// System.out.println( +// options + "\n" + "+" + nestedQuotes + ":" + getPositionString(valueIndex - 2, 1)); + } + } + else + { + // We are at the end of the string + // Check to see if we are corectly closing quotes + if (options.charAt(valueIndex) == '\'') + { + nestedQuotes--; + } + + break; + } + } + } + + if (nestedQuotes != 0 || valueIndex < (optionIndex + 2)) + { + int sepIndex = 0; + + //Try and identify illegal separator character + if (nestedQuotes > 1) + { + for (int i = 0; i < nestedQuotes; i++) + { + sepIndex = options.indexOf('\'', sepIndex); + sepIndex++; + } + } + + if (sepIndex >= options.length() || sepIndex == 0) + { + parseError(valueIndex, "Unterminated option", options); + } + else + { + parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" + + options.charAt(sepIndex) + "'", options); + } + } + + // optionIndex +2 to skip "='" + String value = options.substring(optionIndex + 2, valueIndex); + + optionMap.put(option, value); + + if (valueIndex < (options.length() - 1)) + { + //Recurse to get remaining options + parseOptions(optionMap, options.substring(valueIndex + 2)); + } + } + + + public static void parseError(int index, String error, String url) throws URLSyntaxException + { + parseError(index, 1, error, url); + } + + public static void parseError(int index, int length, String error, String url) throws URLSyntaxException + { + throw new URLSyntaxException(url, error, index, length); + } + + public static String printOptions(HashMap<String, String> options) + { + if (options.isEmpty()) + { + return ""; + } + else + { + StringBuffer sb = new StringBuffer(); + sb.append('?'); + for (String key : options.keySet()) + { + sb.append(key); + + sb.append("='"); + + sb.append(options.get(key)); + + sb.append("'"); + sb.append(DEFAULT_OPTION_SEPERATOR); + } + + sb.deleteCharAt(sb.length() - 1); + + return sb.toString(); + } + } +} diff --git a/java/common/src/org/apache/qpid/url/URLSyntaxException.java b/java/common/src/org/apache/qpid/url/URLSyntaxException.java new file mode 100644 index 0000000000..b454069826 --- /dev/null +++ b/java/common/src/org/apache/qpid/url/URLSyntaxException.java @@ -0,0 +1,94 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.url; + +import java.net.URISyntaxException; + +public class URLSyntaxException extends URISyntaxException +{ + private int _length; + + public URLSyntaxException(String url, String error, int index, int length) + { + super(url, error, index); + + _length = length; + } + + private static String getPositionString(int index, int length) + { + StringBuffer sb = new StringBuffer(index + 1); + + for (int i = 0; i < index; i++) + { + sb.append(" "); + } + + if (length > -1) + { + for (int i = 0; i < length; i++) + { + sb.append('^'); + } + } + + return sb.toString(); + } + + + public String toString() + { + StringBuffer sb = new StringBuffer(); + + sb.append(getReason()); + + if (getIndex() > -1) + { + if (_length != -1) + { + sb.append(" between indicies "); + sb.append(getIndex()); + sb.append(" and "); + sb.append(_length); + } + else + { + sb.append(" at index "); + sb.append(getIndex()); + } + } + + sb.append(" "); + if (getIndex() != -1) + { + sb.append("\n"); + } + + sb.append(getInput()); + + if (getIndex() != -1) + { + sb.append("\n"); + sb.append(getPositionString(getIndex(), _length)); + } + + return sb.toString(); + } + + +} diff --git a/java/common/stylesheets/framing.xsl b/java/common/stylesheets/framing.xsl new file mode 100644 index 0000000000..bb17a72bce --- /dev/null +++ b/java/common/stylesheets/framing.xsl @@ -0,0 +1,61 @@ +<?xml version='1.0'?> +<!-- + - + - Copyright (c) 2006 The Apache Software Foundation + - + - Licensed under the Apache License, Version 2.0 (the "License"); + - you may not use this file except in compliance with the License. + - You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, software + - distributed under the License is distributed on an "AS IS" BASIS, + - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + - See the License for the specific language governing permissions and + - limitations under the License. + - + --> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amq="http://amq.org"> + +<xsl:import href="prepare1.xsl"/> +<xsl:import href="prepare2.xsl"/> +<xsl:import href="prepare3.xsl"/> +<xsl:import href="java.xsl"/> + +<xsl:output indent="yes"/> +<xsl:output method="text" indent="yes" name="textFormat"/> + +<xsl:template match="/"> + <xsl:variable name="prepare1"> + <xsl:apply-templates mode="prepare1" select="."/> + </xsl:variable> + + <xsl:variable name="prepare2"> + <xsl:apply-templates mode="prepare2" select="$prepare1"/> + </xsl:variable> + + <xsl:variable name="model"> + <xsl:apply-templates mode="prepare3" select="$prepare2"/> + </xsl:variable> + + <xsl:apply-templates mode="generate-multi" select="$model"/> + <xsl:apply-templates mode="list-registry" select="$model"/> + + <!-- dump out the intermediary files for debugging --> + <!-- + <xsl:result-document href="prepare1.out"> + <xsl:copy-of select="$prepare1"/> + </xsl:result-document> + + <xsl:result-document href="prepare2.out"> + <xsl:copy-of select="$prepare2"/> + </xsl:result-document> + + <xsl:result-document href="model.out"> + <xsl:copy-of select="$model"/> + </xsl:result-document> + --> +</xsl:template> + +</xsl:stylesheet> diff --git a/java/common/stylesheets/java.xsl b/java/common/stylesheets/java.xsl new file mode 100644 index 0000000000..df72bfb0c0 --- /dev/null +++ b/java/common/stylesheets/java.xsl @@ -0,0 +1,247 @@ +<?xml version='1.0'?> +<!-- + - + - Copyright (c) 2006 The Apache Software Foundation + - + - Licensed under the Apache License, Version 2.0 (the "License"); + - you may not use this file except in compliance with the License. + - You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, software + - distributed under the License is distributed on an "AS IS" BASIS, + - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + - See the License for the specific language governing permissions and + - limitations under the License. + - + --> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amq="http://amq.org"> + +<!-- this class contains the templates for generating java source code for a given framing model --> +<xsl:import href="utils.xsl"/> +<xsl:output method="text" indent="yes" name="textFormat"/> + +<xsl:param name="major"/> +<xsl:param name="minor"/> +<xsl:param name="registry_name"/> +<xsl:param name="version_list_name"/> + +<xsl:template match="/"> + <xsl:apply-templates mode="generate-multi" select="frames"/> + <xsl:apply-templates mode="generate-registry" select="frames"/> +</xsl:template> + +<!-- processes all frames outputting the classes in a single stream --> +<!-- (useful for debugging etc) --> +<xsl:template match="frame" mode="generate-single"> + <xsl:call-template name="generate-class"> + <xsl:with-param name="f" select="."/> + </xsl:call-template> +</xsl:template> + +<!-- generates seperate file for each class/frame --> +<xsl:template match="frame" mode="generate-multi"> + <xsl:variable name="uri" select="concat(@name, '.java')"/> + wrote <xsl:value-of select="$uri"/> + <xsl:result-document href="{$uri}" format="textFormat"> + <xsl:call-template name="generate-class"> + <xsl:with-param name="f" select="."/> + </xsl:call-template> + </xsl:result-document> +</xsl:template> + +<!-- main class generation template --> +<xsl:template name="generate-class"> + <xsl:param name="f"/> + <xsl:value-of select="amq:copyright()"/> +<!-- package org.apache.qpid.framing_<xsl:value-of select="$major"/>_<xsl:value-of select="$minor"/>; --> +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.EncodableAMQDataBlock; +import org.apache.qpid.framing.EncodingUtils; +import org.apache.qpid.framing.FieldTable; + +/** + * This class is autogenerated, do not modify. [From <xsl:value-of select="$f/parent::frames/@protocol"/>] + */ +public class <xsl:value-of select="$f/@name"/> extends AMQMethodBody implements EncodableAMQDataBlock +{ + public static final int CLASS_ID = <xsl:value-of select="$f/@class-id"/>; + public static final int METHOD_ID = <xsl:value-of select="$f/@method-id"/>; + + <xsl:for-each select="$f/field"> + <xsl:text>public </xsl:text><xsl:value-of select="@java-type"/> + <xsl:text> </xsl:text> + <xsl:value-of select="@name"/>; + </xsl:for-each> + + protected int getClazz() + { + return <xsl:value-of select="$f/@class-id"/>; + } + + protected int getMethod() + { + return <xsl:value-of select="$f/@method-id"/>; + } + + protected int getBodySize() + { + <xsl:choose> + <xsl:when test="$f/field"> + return + <xsl:for-each select="$f/field"> + <xsl:if test="position() != 1">+ + </xsl:if> + <xsl:value-of select="amq:field-length(.)"/> + </xsl:for-each> + ; + </xsl:when> + <xsl:otherwise>return 0;</xsl:otherwise> + </xsl:choose> + } + + protected void writeMethodPayload(ByteBuffer buffer) + { + <xsl:for-each select="$f/field"> + <xsl:if test="@type != 'bit'"> + <xsl:value-of select="amq:encoder(.)"/>; + </xsl:if> + <xsl:if test="@type = 'bit' and @boolean-index = 1"> + <xsl:text>EncodingUtils.writeBooleans(buffer, new boolean[]{</xsl:text> + <xsl:value-of select="$f/field[@type='bit']/@name" separator=", "/>}); + </xsl:if> + </xsl:for-each> + } + + public void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException + { + <xsl:for-each select="$f/field"> + <xsl:value-of select="amq:decoder(.)"/>; + </xsl:for-each> + } + + public String toString() + { + StringBuffer buf = new StringBuffer(super.toString()); + <xsl:for-each select="$f/field"> + <xsl:text>buf.append(" </xsl:text><xsl:value-of select="@name"/>: ").append(<xsl:value-of select="@name"/>); + </xsl:for-each> + return buf.toString(); + } + + public static AMQFrame createAMQFrame(int channelId<xsl:if test="$f/field">, </xsl:if><xsl:value-of select="$f/field/concat(@java-type, ' ', @name)" separator=", "/>) + { + <xsl:value-of select="@name"/> body = new <xsl:value-of select="@name"/>(); + <xsl:for-each select="$f/field"> + <xsl:value-of select="concat('body.', @name, ' = ', @name)"/>; + </xsl:for-each> + AMQFrame frame = new AMQFrame(); + frame.channel = channelId; + frame.bodyFrame = body; + return frame; + } +} +</xsl:template> + +<xsl:template match="/" mode="generate-registry"> + <xsl:text>Matching root for registry mode!</xsl:text> + <xsl:value-of select="."/> + <xsl:apply-templates select="frames" mode="generate-registry"/> +</xsl:template> + +<xsl:template match="registries" mode="generate-registry"> +Wrote MethodBodyDecoderRegistry.java + <xsl:result-document href="MethodBodyDecoderRegistry.java" format="textFormat"> + <xsl:value-of select="amq:copyright()"/> +<!-- package org.apache.qpid.framing_<xsl:value-of select="$major"/>_<xsl:value-of select="$minor"/>; --> +package org.apache.qpid.framing; + +import java.util.Map; +import java.util.HashMap; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBody; + +/** + * This class is autogenerated, do not modify. + */ +public final class MethodBodyDecoderRegistry +{ + private static final Logger _log = Logger.getLogger(MethodBodyDecoderRegistry.class); + + private static final Map _classMethodProductToMethodBodyMap = new HashMap(); + + static + { + <xsl:for-each select="registry"> + <xsl:value-of select="concat(@name, '.register(_classMethodProductToMethodBodyMap)')"/>; + </xsl:for-each> + } + + public static AMQMethodBody get(int clazz, int method) throws AMQFrameDecodingException + { + Class bodyClass = (Class) _classMethodProductToMethodBodyMap.get(new Integer(clazz * 1000 + method)); + if (bodyClass != null) + { + try + { + return (AMQMethodBody) bodyClass.newInstance(); + } + catch (Exception e) + { + throw new AMQFrameDecodingException(_log, + "Unable to instantiate body class for class " + clazz + " and method " + method + ": " + e, e); + } + } + else + { + throw new AMQFrameDecodingException(_log, + "Unable to find a suitable decoder for class " + clazz + " and method " + method); + } + } +} +</xsl:result-document> +</xsl:template> + +<xsl:template match="frames" mode="list-registry"> + <xsl:if test="$registry_name"> + + <xsl:variable name="file" select="concat($registry_name, '.java')"/> + wrote <xsl:value-of select="$file"/> + <xsl:result-document href="{$file}" format="textFormat"> + <xsl:value-of select="amq:copyright()"/> +<!-- package org.apache.qpid.framing_<xsl:value-of select="$major"/>_<xsl:value-of select="$minor"/>; --> +package org.apache.qpid.framing; + +import java.util.Map; + +/** + * This class is autogenerated, do not modify. [From <xsl:value-of select="@protocol"/>] + */ +class <xsl:value-of select="$registry_name"/> +{ + static void register(Map map) + { + <xsl:for-each select="frame"> + <xsl:text>map.put(new Integer(</xsl:text> + <xsl:value-of select="@class-id"/> + <xsl:text> * 1000 + </xsl:text> + <xsl:value-of select="@method-id"/> + <xsl:text>), </xsl:text> + <xsl:value-of select="concat(@name, '.class')"/>); + </xsl:for-each> + } +} + </xsl:result-document> + + </xsl:if> +</xsl:template> + +</xsl:stylesheet> diff --git a/java/common/stylesheets/prepare1.xsl b/java/common/stylesheets/prepare1.xsl new file mode 100644 index 0000000000..5b7641e408 --- /dev/null +++ b/java/common/stylesheets/prepare1.xsl @@ -0,0 +1,111 @@ +<?xml version='1.0'?> +<!-- + - + - Copyright (c) 2006 The Apache Software Foundation + - + - Licensed under the Apache License, Version 2.0 (the "License"); + - you may not use this file except in compliance with the License. + - You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, software + - distributed under the License is distributed on an "AS IS" BASIS, + - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + - See the License for the specific language governing permissions and + - limitations under the License. + - + --> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amq="http://amq.org"> + +<xsl:import href="utils.xsl"/> + +<xsl:output indent="yes"/> +<xsl:param name="asl_base"/> + +<!-- pre-process, phase 1 --> + +<xsl:template match="/"> + <xsl:apply-templates select="protocol" mode="prepare1"/> +</xsl:template> + +<xsl:template match="amqp" mode="prepare1"> + <frames> + <xsl:attribute name="protocol"> + <xsl:value-of select="@comment"/> + <xsl:text> (</xsl:text> + <xsl:text>major=</xsl:text><xsl:value-of select="@major"/> + <xsl:text>, minor=</xsl:text><xsl:value-of select="@minor"/> + <xsl:text>)</xsl:text> + </xsl:attribute> + <xsl:attribute name="major"> + <xsl:value-of select="@major"/> + </xsl:attribute> + <xsl:attribute name="minor"> + <xsl:value-of select="@minor"/> + </xsl:attribute> + <xsl:apply-templates mode="prepare1" select="inherit"/> + <xsl:apply-templates mode="prepare1" select="include"/> + <xsl:apply-templates mode="prepare1" select="domain"/> + <xsl:apply-templates mode="prepare1" select="class"/> + </frames> +</xsl:template> + +<xsl:template match="include" mode="prepare1"> + <xsl:if test="@filename != 'asl_constants.asl'"> + <!-- skip asl_constants.asl, we don't need it and it is not well formed so causes error warnings --> + <xsl:apply-templates select="document(@filename)" mode="prepare1"/> + </xsl:if> +</xsl:template> + +<xsl:template match="inherit" mode="prepare1"> + <xsl:variable name="ibase" select="concat('file:///', $asl_base, '/', @name, '.asl')"/> + <xsl:choose> + <xsl:when test="document($ibase)"> + <xsl:apply-templates select="document($ibase)" mode="prepare1"/> + </xsl:when> + <xsl:otherwise> + <xsl:message> + Could not inherit from <xsl:value-of select="$ibase"/>; file not found. + </xsl:message> + </xsl:otherwise> + </xsl:choose> +</xsl:template> + +<xsl:template match="class[@index]" mode="prepare1"> + <xsl:apply-templates select="method" mode="prepare1"/> +</xsl:template> + +<xsl:template match="method" mode="prepare1"> + <xsl:if test="parent::class[@index]"><!-- there is a template class that has no index, which we want to skip --> + <frame> + <xsl:attribute name="name"><xsl:value-of select="amq:class-name(parent::class/@name, @name)"/></xsl:attribute> + <xsl:attribute name="class-id"><xsl:value-of select="parent::class/@index"/></xsl:attribute> + <xsl:if test="@index"> + <xsl:attribute name="method-id"><xsl:value-of select="@index"/></xsl:attribute> + </xsl:if> + <xsl:if test="not(@index)"> + <xsl:attribute name="method-id"><xsl:number count="method"/></xsl:attribute> + </xsl:if> + + <xsl:apply-templates select="field" mode="prepare1"/> + </frame> + </xsl:if> +</xsl:template> + +<xsl:template match="domain" mode="prepare1"> + <domain> + <name><xsl:value-of select="@name"/></name> + <type><xsl:value-of select="@type"/></type> + </domain> +</xsl:template> + +<xsl:template match="field" mode="prepare1"> + <field> + <xsl:copy-of select="@name"/> + <xsl:copy-of select="@type"/> + <xsl:copy-of select="@domain"/> + </field> +</xsl:template> + +</xsl:stylesheet> diff --git a/java/common/stylesheets/prepare2.xsl b/java/common/stylesheets/prepare2.xsl new file mode 100644 index 0000000000..38110b0a33 --- /dev/null +++ b/java/common/stylesheets/prepare2.xsl @@ -0,0 +1,66 @@ +<?xml version='1.0'?> +<!-- + - + - Copyright (c) 2006 The Apache Software Foundation + - + - Licensed under the Apache License, Version 2.0 (the "License"); + - you may not use this file except in compliance with the License. + - You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, software + - distributed under the License is distributed on an "AS IS" BASIS, + - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + - See the License for the specific language governing permissions and + - limitations under the License. + - + --> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amq="http://amq.org"> + +<xsl:import href="utils.xsl"/> + +<xsl:output indent="yes"/> + +<!-- pre-process, phase 2 --> + +<xsl:key name="domain-lookup" match="domain" use="name"/> + +<xsl:template match="/"> + <xsl:apply-templates mode="prepare2" select="frames"/> +</xsl:template> + +<xsl:template match="field[@domain]" mode="prepare2"> + <field> + <xsl:variable name="t1" select="key('domain-lookup', @domain)/type"/> + <xsl:attribute name="name"><xsl:value-of select="amq:field-name(@name)"/></xsl:attribute> + <xsl:attribute name="type"><xsl:value-of select="$t1"/></xsl:attribute> + </field> +</xsl:template> + +<xsl:template match="field[@type]" mode="prepare2"> + <field> + <xsl:attribute name="name"><xsl:value-of select="amq:field-name(@name)"/></xsl:attribute> + <xsl:attribute name="type"><xsl:value-of select="@type"/></xsl:attribute> + </field> +</xsl:template> + +<xsl:template match="frames" mode="prepare2"> + <frames> + <xsl:copy-of select="@protocol"/> + <xsl:copy-of select="@major"/> + <xsl:copy-of select="@minor"/> + <xsl:apply-templates mode="prepare2"/> + </frames> +</xsl:template> + +<xsl:template match="frame" mode="prepare2"> + <xsl:element name="{name()}"> + <xsl:copy-of select="@*"/> + <xsl:apply-templates mode="prepare2" select="field"/> + </xsl:element> +</xsl:template> + +<xsl:template match="domain" mode="prepare2"></xsl:template> + +</xsl:stylesheet> diff --git a/java/common/stylesheets/prepare3.xsl b/java/common/stylesheets/prepare3.xsl new file mode 100644 index 0000000000..6d6d75d8f0 --- /dev/null +++ b/java/common/stylesheets/prepare3.xsl @@ -0,0 +1,62 @@ +<?xml version='1.0'?> +<!-- + - + - Copyright (c) 2006 The Apache Software Foundation + - + - Licensed under the Apache License, Version 2.0 (the "License"); + - you may not use this file except in compliance with the License. + - You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, software + - distributed under the License is distributed on an "AS IS" BASIS, + - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + - See the License for the specific language governing permissions and + - limitations under the License. + - + --> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amq="http://amq.org"> + +<xsl:import href="utils.xsl"/> + +<xsl:output indent="yes"/> + +<!-- final preparation of the model --> + +<xsl:template match="/"> + <xsl:apply-templates mode="prepare3"/> +</xsl:template> + +<xsl:template match="frames" mode="prepare3"> + <frames> + <xsl:copy-of select="@protocol"/> + <xsl:copy-of select="@major"/> + <xsl:copy-of select="@minor"/> + <xsl:apply-templates mode="prepare3"/> + </frames> +</xsl:template> + +<xsl:template match="frame" mode="prepare3"> + <xsl:element name="frame"> + <xsl:copy-of select="@*"/> + <xsl:if test="field[@type='bit']"><xsl:attribute name="has-bit-field">true</xsl:attribute></xsl:if> + <xsl:apply-templates mode="prepare3"/> + </xsl:element> +</xsl:template> + + +<xsl:template match="field" mode="prepare3"> + <field> + <xsl:attribute name="type"><xsl:value-of select="@type"/></xsl:attribute> + <!-- ensure the field name is processed to be a valid java name --> + <xsl:attribute name="name"><xsl:value-of select="amq:field-name(@name)"/></xsl:attribute> + <!-- add some attributes to make code generation easier --> + <xsl:attribute name="java-type"><xsl:value-of select="amq:java-type(@type)"/></xsl:attribute> + <xsl:if test="@type='bit'"> + <xsl:attribute name="boolean-index"><xsl:number count="field[@type='bit']"/></xsl:attribute> + </xsl:if> + </field> +</xsl:template> + +</xsl:stylesheet> diff --git a/java/common/stylesheets/readme.txt b/java/common/stylesheets/readme.txt new file mode 100644 index 0000000000..b373055df9 --- /dev/null +++ b/java/common/stylesheets/readme.txt @@ -0,0 +1,52 @@ +This directory contains the xsl stylesheets used to generate the code from the +AMQP protocol specification. They require an XSLT2.0 processor, currently +Saxon 8 is used. + +The generation process is controlled by the framing.xsl stylesheet. This performs +several phases of transformation, using the other stylesheets. The transformation +in each phase is defined in a separate file, and these are designed to also allow +then to be run individually. + +The generation takes the amq.asl as input, it also requires that the path to the +directory where the base asl definitions reside (those definitions that the main +amq.asl defintion inherits from) be passed in via a paramter called asl_base. + +The files involved are as follows: + + framing.xsl The control file for the entire generation process + + prepare1.xsl Resolves the separate files that make up the protocol + definition, building a single tree containing all the + information as a set of 'frame' elements, each of which + has attributes for its name, and ids for the class and + method it refers to and contains zero or more field + elements. + + A method id is generated based on the order of the + method elements within the class elements in the original + specification. The class id is taken from the enclosing + class element. + + prepare2.xsl Resolves domains into their corresponding types. (This is + much easier when all the information is in a single tree, + hence the separate frame). + + prepare3.xsl Converts names into valid java names and augments the + tree to include information that makes the subsequent + generation phase simpler e.g. the index of boolean + fields as several boolean flags are combined into a + single byte. (This is easier once the domains have been + resolved, hence the separate phase). + + java.xsl Generates java classes for each frame, and a registry of + all the frames to a 'magic' number generated from their + class and method id. + + utils.xsl Contains some utility methods for e.g. producing valid + java names. + +For debugging the framing.xsl can output the intermediary files. This can be +enabled by uncommenting the relevant lines (a comment explaining this is +provided inline). + +
\ No newline at end of file diff --git a/java/common/stylesheets/registry.xsl b/java/common/stylesheets/registry.xsl new file mode 100644 index 0000000000..eb382c296b --- /dev/null +++ b/java/common/stylesheets/registry.xsl @@ -0,0 +1,29 @@ +<?xml version='1.0'?> +<!-- + - + - Copyright (c) 2006 The Apache Software Foundation + - + - Licensed under the Apache License, Version 2.0 (the "License"); + - you may not use this file except in compliance with the License. + - You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, software + - distributed under the License is distributed on an "AS IS" BASIS, + - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + - See the License for the specific language governing permissions and + - limitations under the License. + - + --> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amq="http://amq.org"> + +<xsl:import href="java.xsl"/> + +<xsl:output method="text" indent="yes" name="textFormat"/> + +<xsl:template match="/"> + <xsl:apply-templates mode="generate-registry" select="registries"/> +</xsl:template> + +</xsl:stylesheet> diff --git a/java/common/stylesheets/utils.xsl b/java/common/stylesheets/utils.xsl new file mode 100644 index 0000000000..e6fda6c9d5 --- /dev/null +++ b/java/common/stylesheets/utils.xsl @@ -0,0 +1,201 @@ +<?xml version='1.0'?> +<!-- + - + - Copyright (c) 2006 The Apache Software Foundation + - + - Licensed under the Apache License, Version 2.0 (the "License"); + - you may not use this file except in compliance with the License. + - You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, software + - distributed under the License is distributed on an "AS IS" BASIS, + - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + - See the License for the specific language governing permissions and + - limitations under the License. + - + --> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amq="http://amq.org"> + +<!-- This file contains functions that are used in the generation of the java classes for framing --> + +<!-- create copyright notice for generated files --> +<xsl:function name="amq:copyright">/** +* +* Copyright (c) 2006 The Apache Software Foundation +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*/ +</xsl:function> + +<!-- retrieve the java type of a given amq type --> +<xsl:function name="amq:java-type"> + <xsl:param name="t"/> + <xsl:choose> + <xsl:when test="$t='char'">char</xsl:when> + <xsl:when test="$t='octet'">short</xsl:when> + <xsl:when test="$t='short'">int</xsl:when> + <xsl:when test="$t='shortstr'">String</xsl:when> + <xsl:when test="$t='longstr'">byte[]</xsl:when> + <xsl:when test="$t='bit'">boolean</xsl:when> + <xsl:when test="$t='long'">long</xsl:when> + <xsl:when test="$t='longlong'">long</xsl:when> + <xsl:when test="$t='table'">FieldTable</xsl:when> + <xsl:otherwise>Object /*WARNING: undefined type*/</xsl:otherwise> + </xsl:choose> +</xsl:function> + +<!-- retrieve the code to get the field size of a given amq type --> +<xsl:function name="amq:field-length"> + <xsl:param name="f"/> + <xsl:choose> + <xsl:when test="$f/@type='bit' and $f/@boolean-index=1"> + <xsl:value-of select="concat('1 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='bit' and $f/@boolean-index > 1"> + <xsl:value-of select="concat('0 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='char'"> + <xsl:value-of select="concat('1 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='octet'"> + <xsl:value-of select="concat('1 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='short'"> + <xsl:value-of select="concat('2 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='long'"> + <xsl:value-of select="concat('4 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='longlong'"> + <xsl:value-of select="concat('8 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='shortstr'"> + <xsl:value-of select="concat('EncodingUtils.encodedShortStringLength(', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='longstr'"> + <xsl:value-of select="concat('4 + (', $f/@name, ' == null ? 0 : ', $f/@name, '.length)')"/> + </xsl:when> + <xsl:when test="$f/@type='table'"> + <xsl:value-of select="concat('EncodingUtils.encodedFieldTableLength(', $f/@name, ')')"/> + </xsl:when> + <xsl:otherwise><xsl:text>/* WARNING: COULD NOT DETERMINE FIELD SIZE */</xsl:text></xsl:otherwise> + </xsl:choose> +</xsl:function> + +<!-- retrieve the code to encode a field of a given amq type --> +<!-- Note: + This method will not provide an encoder for a bit field. + Bit fields should be encoded together separately. --> + +<xsl:function name="amq:encoder"> + <xsl:param name="f"/> + <xsl:choose> + <xsl:when test="$f/@type='char'"> + <xsl:value-of select="concat('EncodingUtils.writeChar(buffer, ', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='octet'"> + <xsl:value-of select="concat('EncodingUtils.writeUnsignedByte(buffer, ', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='short'"> + <xsl:value-of select="concat('EncodingUtils.writeUnsignedShort(buffer, ', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='long'"> + <xsl:value-of select="concat('EncodingUtils.writeUnsignedInteger(buffer, ', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='longlong'"> + <xsl:value-of select="concat('buffer.putLong(', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='shortstr'"> + <xsl:value-of select="concat('EncodingUtils.writeShortStringBytes(buffer, ', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='longstr'"> + <xsl:value-of select="concat('EncodingUtils.writeLongstr(buffer, ', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='table'"> + <xsl:value-of select="concat('EncodingUtils.writeFieldTableBytes(buffer, ', $f/@name, ')')"/> + </xsl:when> + <xsl:otherwise><xsl:text>/* WARNING: COULD NOT DETERMINE ENCODER */</xsl:text></xsl:otherwise> + </xsl:choose> +</xsl:function> + +<!-- retrieve the code to decode a field of a given amq type --> +<xsl:function name="amq:decoder"> + <xsl:param name="f"/> + <xsl:choose> + <xsl:when test="$f/@type='bit'"> + <xsl:if test="$f/@boolean-index = 1"> + <xsl:text>boolean[] bools = EncodingUtils.readBooleans(buffer);</xsl:text> + </xsl:if> + <xsl:value-of select="concat($f/@name, ' = bools[', $f/@boolean-index - 1 , ']')"/> + </xsl:when> + <xsl:when test="$f/@type='char'"> + <xsl:value-of select="concat($f/@name, ' = buffer.getChar()')"/> + </xsl:when> + <xsl:when test="$f/@type='octet'"> + <xsl:value-of select="concat($f/@name, ' = buffer.getUnsigned()')"/> + </xsl:when> + <xsl:when test="$f/@type='short'"> + <xsl:value-of select="concat($f/@name, ' = buffer.getUnsignedShort()')"/> + </xsl:when> + <xsl:when test="$f/@type='long'"> + <xsl:value-of select="concat($f/@name, ' = buffer.getUnsignedInt()')"/> + </xsl:when> + <xsl:when test="$f/@type='longlong'"> + <xsl:value-of select="concat($f/@name, ' = buffer.getLong()')"/> + </xsl:when> + <xsl:when test="$f/@type='shortstr'"> + <xsl:value-of select="concat($f/@name, ' = EncodingUtils.readShortString(buffer)')"/> + </xsl:when> + <xsl:when test="$f/@type='longstr'"> + <xsl:value-of select="concat($f/@name, ' = EncodingUtils.readLongstr(buffer)')"/> + </xsl:when> + <xsl:when test="$f/@type='table'"> + <xsl:value-of select="concat($f/@name, ' = EncodingUtils.readFieldTable(buffer)')"/> + </xsl:when> + <xsl:otherwise><xsl:text>/* WARNING: COULD NOT DETERMINE DECODER */</xsl:text></xsl:otherwise> + </xsl:choose> +</xsl:function> + +<!-- create the class name for a frame, based on class and method (passed in) --> +<xsl:function name="amq:class-name"> + <xsl:param name="class"/> + <xsl:param name="method"/> + <xsl:value-of select="concat(amq:upper-first($class),amq:upper-first(amq:field-name($method)), 'Body')"/> +</xsl:function> + +<!-- get a valid field name, processing spaces and '-'s where appropriate --> +<xsl:function name="amq:field-name"> + <xsl:param name="name"/> + <xsl:choose> + <xsl:when test="contains($name, ' ')"> + <xsl:value-of select="concat(substring-before($name, ' '), amq:upper-first(substring-after($name, ' ')))"/> + </xsl:when> + <xsl:when test="contains($name, '-')"> + <xsl:value-of select="concat(substring-before($name, '-'), amq:upper-first(substring-after($name, '-')))"/> + </xsl:when> + <xsl:otherwise> + <xsl:value-of select="$name"/> + </xsl:otherwise> + </xsl:choose> +</xsl:function> + +<!-- convert the first character of the input to upper-case --> +<xsl:function name="amq:upper-first"> + <xsl:param name="in"/> + <xsl:value-of select="concat(upper-case(substring($in, 1, 1)), substring($in, 2))"/> +</xsl:function> + +</xsl:stylesheet> |