diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2009-07-20 19:00:14 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2009-07-20 19:00:14 +0000 |
commit | 94a235124bceb2b54d16bba52c9eb8fb32932b27 (patch) | |
tree | 4031ec381dade456996b8d276b65fd141332e0c9 | |
parent | 4ef4a155256b458e76769cc956e5a3bdaad90577 (diff) | |
download | qpid-python-94a235124bceb2b54d16bba52c9eb8fb32932b27.tar.gz |
Java Broker 0-10 exploratory work
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@795956 13f79535-47bb-0310-9956-ffa450edef68
9 files changed, 591 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java new file mode 100644 index 0000000000..3e79bc2260 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import java.util.Set; + +public interface AMQMessageHeader +{ + String getCorrelationId(); + + long getExpiration(); + + String getMessageId(); + + byte getPriority(); + + long getTimestamp(); + + String getType(); + + String getReplyTo(); + + Object getHeader(String name); + + boolean containsHeaders(Set<String> names); + + boolean containsHeader(String name); + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java new file mode 100644 index 0000000000..2ba5eb04d9 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.MessageCleanupException; + +public class AMQMessageReference extends MessageReference<AMQMessage> +{ + public AMQMessageReference(AMQMessage message) + { + super(message); + } + + protected void onReference() + { + getMessage().incrementReference(); + } + + protected void onRelease() + { + try + { + getMessage().decrementReference(); + } + catch (MessageCleanupException e) + { + // TODO + throw new RuntimeException(e); + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java new file mode 100644 index 0000000000..37e7bcc566 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.FieldTable; + +import java.util.Set; + +public class ContentHeaderBodyAdapter implements AMQMessageHeader +{ + private final ContentHeaderBody _contentHeaderBody; + + public ContentHeaderBodyAdapter(ContentHeaderBody contentHeaderBody) + { + _contentHeaderBody = contentHeaderBody; + } + + private BasicContentHeaderProperties getProperties() + { + return (BasicContentHeaderProperties) _contentHeaderBody.properties; + } + + public String getCorrelationId() + { + return getProperties().getCorrelationIdAsString(); + } + + public long getExpiration() + { + return getProperties().getExpiration(); + } + + public String getMessageId() + { + return getProperties().getMessageIdAsString(); + } + + public byte getPriority() + { + return getProperties().getPriority(); + } + + public long getTimestamp() + { + return getProperties().getTimestamp(); + } + + public String getType() + { + return getProperties().getTypeAsString(); + } + + public String getReplyTo() + { + return getProperties().getReplyToAsString(); + } + + public Object getHeader(String name) + { + FieldTable ft = getProperties().getHeaders(); + return ft.get(name); + } + + public boolean containsHeaders(Set<String> names) + { + FieldTable ft = getProperties().getHeaders(); + for(String name : names) + { + if(!ft.containsKey(name)) + { + return false; + } + } + return true; + } + + public boolean containsHeader(String name) + { + FieldTable ft = getProperties().getHeaders(); + return ft.containsKey(name); + } + + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java new file mode 100644 index 0000000000..1b3fdb1870 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + + +import org.apache.qpid.server.queue.Filterable; + +public interface InboundMessage extends Filterable +{ + String getRoutingKey(); + + AMQMessageHeader getMessageHeader(); + + boolean isPersistent(); + + boolean isRedelivered(); + + long getSize(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java new file mode 100644 index 0000000000..7974c40b04 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +public abstract class MessageReference<M extends ServerMessage> +{ + + private static final AtomicReferenceFieldUpdater<MessageReference, ServerMessage> _messageUpdater = + AtomicReferenceFieldUpdater.newUpdater(MessageReference.class, ServerMessage.class,"_message"); + + private volatile M _message; + + public MessageReference(M message) + { + _message = message; + onReference(); + } + + abstract protected void onReference(); + + abstract protected void onRelease(); + + public M getMessage() + { + return _message; + } + + public void release() + { + ServerMessage message = _messageUpdater.getAndSet(this,null); + if(message != null) + { + onRelease(); + } + } + + @Override + protected void finalize() throws Throwable + { + if(_message != null) + { + onRelease(); + _message = null; + } + super.finalize(); //To change body of overridden methods use File | Settings | File Templates. + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java new file mode 100644 index 0000000000..0c03cb0ea7 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageDeliveryPriority; + +import java.util.Set; +import java.util.Map; + +class MessageTransferHeader implements AMQMessageHeader +{ + + + public static final String JMS_TYPE = "x-jms-type"; + + private final DeliveryProperties _deliveryProps; + private final MessageProperties _messageProps; + + public MessageTransferHeader(DeliveryProperties deliveryProps, MessageProperties messageProps) + { + _deliveryProps = deliveryProps; + _messageProps = messageProps; + } + + public String getCorrelationId() + { + return _messageProps == null ? null : new String(_messageProps.getCorrelationId()); + } + + public long getExpiration() + { + return _deliveryProps == null ? null : _deliveryProps.getExpiration(); + } + + public String getMessageId() + { + return _messageProps == null ? null : String.valueOf(_messageProps.getMessageId()); + } + + public byte getPriority() + { + MessageDeliveryPriority priority = _deliveryProps == null + ? MessageDeliveryPriority.MEDIUM + : _deliveryProps.getPriority(); + return (byte) priority.getValue(); + } + + public long getTimestamp() + { + return _deliveryProps == null ? 0L : _deliveryProps.getTimestamp(); + } + + public String getType() + { + Object type = getHeader(JMS_TYPE); + return type instanceof String ? (String) type : null; + } + + public String getReplyTo() + { + return _messageProps == null ? null : _messageProps.getReplyTo().toString(); + } + + public Object getHeader(String name) + { + Map<String, Object> appHeaders = _messageProps == null ? null : _messageProps.getApplicationHeaders(); + return appHeaders == null ? null : appHeaders.get(name); + } + + public boolean containsHeaders(Set<String> names) + { + Map<String, Object> appHeaders = _messageProps == null ? null : _messageProps.getApplicationHeaders(); + return appHeaders != null && appHeaders.keySet().containsAll(names); + + } + + public boolean containsHeader(String name) + { + Map<String, Object> appHeaders = _messageProps == null ? null : _messageProps.getApplicationHeaders(); + return appHeaders != null && appHeaders.containsKey(name); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java new file mode 100644 index 0000000000..38b647bfd1 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import org.apache.qpid.transport.*; + +import java.util.concurrent.atomic.AtomicLong; + + +public class MessageTransferMessage implements InboundMessage, ServerMessage +{ + private static final AtomicLong _numberSource = new AtomicLong(0L); + + private final MessageTransfer _xfr; + private final DeliveryProperties _deliveryProps; + private final MessageProperties _messageProps; + private final AMQMessageHeader _messageHeader; + private final long _messageNumber; + private final long _arrivalTime; + + public MessageTransferMessage(MessageTransfer xfr) + { + _xfr = xfr; + _messageNumber = _numberSource.getAndIncrement(); + Header header = _xfr.getHeader(); + _deliveryProps = header.get(DeliveryProperties.class); + _messageProps = header.get(MessageProperties.class); + _messageHeader = new MessageTransferHeader(_deliveryProps, _messageProps); + _arrivalTime = System.currentTimeMillis(); + } + + public String getRoutingKey() + { + return _deliveryProps == null ? null : _deliveryProps.getRoutingKey(); + } + + public AMQMessageHeader getMessageHeader() + { + return _messageHeader; + } + + public boolean isPersistent() + { + return (_deliveryProps != null) && (_deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT); + } + + public boolean isRedelivered() + { + return false; + } + + public long getSize() + { + return _messageProps.getContentLength(); + } + + public boolean isImmediate() + { + return _deliveryProps != null && _deliveryProps.getImmediate(); + } + + public long getExpiration() + { + return _deliveryProps == null ? 0L : _deliveryProps.getExpiration(); + } + + public MessageReference newReference() + { + return new TransferMessageReference(this); + } + + public Long getMessageNumber() + { + return _messageNumber; + } + + public long getArrivalTime() + { + return _arrivalTime; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java new file mode 100644 index 0000000000..e5d15e29f9 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import org.apache.qpid.server.queue.Filterable; + +public interface ServerMessage extends Filterable +{ + String getRoutingKey(); + + AMQMessageHeader getMessageHeader(); + + boolean isPersistent(); + + boolean isRedelivered(); + + long getSize(); + + boolean isImmediate(); + + long getExpiration(); + + MessageReference newReference(); + + Long getMessageNumber(); + + long getArrivalTime(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java new file mode 100644 index 0000000000..784d47a840 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +public class TransferMessageReference extends MessageReference<MessageTransferMessage> +{ + public TransferMessageReference(MessageTransferMessage message) + { + super(message); + } + + protected void onReference() + { + + } + + protected void onRelease() + { + + } +} |