diff options
Diffstat (limited to 'trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java')
-rw-r--r-- | trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java | 188 |
1 files changed, 0 insertions, 188 deletions
diff --git a/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java b/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java deleted file mode 100644 index db84b83adb..0000000000 --- a/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid; - -import org.apache.qpid.transport.*; -import org.apache.qpid.transport.network.mina.MinaHandler; - -import static org.apache.qpid.transport.util.Functions.str; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; - - -/** - * ToyBroker - * - * @author Rafael H. Schloming - */ - -class ToyBroker extends SessionDelegate -{ - - private ToyExchange exchange; - private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>(); - - public ToyBroker(ToyExchange exchange) - { - this.exchange = exchange; - } - - public void messageAcquire(Session context, MessageAcquire struct) - { - System.out.println("\n==================> messageAcquire " ); - context.executionResult((int) struct.getId(), new Acquired(struct.getTransfers())); - } - - @Override public void queueDeclare(Session ssn, QueueDeclare qd) - { - exchange.createQueue(qd.getQueue()); - System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n"); - } - - @Override public void exchangeBind(Session ssn, ExchangeBind qb) - { - exchange.bindQueue(qb.getExchange(), qb.getBindingKey(),qb.getQueue()); - System.out.println("\n==================> bound queue: " + qb.getQueue() + " with binding key " + qb.getBindingKey() + "\n"); - } - - @Override public void queueQuery(Session ssn, QueueQuery qq) - { - QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue()); - ssn.executionResult((int) qq.getId(), result); - } - - @Override public void messageSubscribe(Session ssn, MessageSubscribe ms) - { - Consumer c = new Consumer(); - c._queueName = ms.getQueue(); - consumers.put(ms.getDestination(),c); - System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n"); - } - - @Override public void messageFlow(Session ssn,MessageFlow struct) - { - Consumer c = consumers.get(struct.getDestination()); - c._credit = struct.getValue(); - System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n"); - } - - @Override public void messageFlush(Session ssn,MessageFlush struct) - { - System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n"); - checkAndSendMessagesToConsumer(ssn,struct.getDestination()); - } - - @Override public void messageTransfer(Session ssn, MessageTransfer xfr) - { - String dest = xfr.getDestination(); - System.out.println("received transfer " + dest); - Header header = xfr.getHeader(); - DeliveryProperties props = header.get(DeliveryProperties.class); - if (props != null) - { - System.out.println("received headers routing_key " + props.getRoutingKey()); - } - MessageProperties mp = header.get(MessageProperties.class); - System.out.println("MP: " + mp); - if (mp != null) - { - System.out.println(mp.getApplicationHeaders()); - } - - if (exchange.route(dest,props.getRoutingKey(),xfr)) - { - System.out.println("queued " + xfr); - dispatchMessages(ssn); - } - else - { - - if (props == null || !props.getDiscardUnroutable()) - { - RangeSet ranges = new RangeSet(); - ranges.add(xfr.getId()); - ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE, - "no such destination"); - } - } - ssn.processed(xfr); - } - - private void transferMessageToPeer(Session ssn,String dest, MessageTransfer m) - { - System.out.println("\n==================> Transfering message to: " +dest + "\n"); - ssn.messageTransfer(m.getDestination(), MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, - m.getHeader(), m.getBody()); - } - - private void dispatchMessages(Session ssn) - { - for (String dest: consumers.keySet()) - { - checkAndSendMessagesToConsumer(ssn,dest); - } - } - - private void checkAndSendMessagesToConsumer(Session ssn,String dest) - { - Consumer c = consumers.get(dest); - LinkedBlockingQueue<MessageTransfer> queue = exchange.getQueue(c._queueName); - MessageTransfer m = queue.poll(); - while (m != null && c._credit>0) - { - transferMessageToPeer(ssn,dest,m); - c._credit--; - m = queue.poll(); - } - } - - // ugly, but who cares :) - // assumes unit is always no of messages, not bytes - // assumes it's credit mode and not window - private class Consumer - { - long _credit; - String _queueName; - } - - public static final void main(String[] args) throws IOException - { - final ToyExchange exchange = new ToyExchange(); - ConnectionDelegate delegate = new ServerDelegate() - { - public SessionDelegate getSessionDelegate() - { - return new ToyBroker(exchange); - } - }; - - MinaHandler.accept("0.0.0.0", 5672, delegate); - } - -} |