diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java | 208 |
1 files changed, 208 insertions, 0 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java new file mode 100644 index 0000000000..5423bbb68f --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java @@ -0,0 +1,208 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid; + +import org.apache.qpid.transport.*; +import org.apache.qpid.transport.network.mina.MinaHandler; + +import static org.apache.qpid.transport.util.Functions.str; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + + +/** + * ToyBroker + * + * @author Rafael H. Schloming + */ + +class ToyBroker extends SessionDelegate +{ + + private ToyExchange exchange; + private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>(); + + public ToyBroker(ToyExchange exchange) + { + this.exchange = exchange; + } + + public void messageAcquire(Session context, MessageAcquire struct) + { + System.out.println("\n==================> messageAcquire " ); + context.executionResult((int) struct.getId(), new Acquired(struct.getTransfers())); + } + + @Override public void queueDeclare(Session ssn, QueueDeclare qd) + { + exchange.createQueue(qd.getQueue()); + System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n"); + } + + @Override public void exchangeBind(Session ssn, ExchangeBind qb) + { + exchange.bindQueue(qb.getExchange(), qb.getBindingKey(),qb.getQueue()); + System.out.println("\n==================> bound queue: " + qb.getQueue() + " with binding key " + qb.getBindingKey() + "\n"); + } + + @Override public void queueQuery(Session ssn, QueueQuery qq) + { + QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue()); + ssn.executionResult((int) qq.getId(), result); + } + + @Override public void messageSubscribe(Session ssn, MessageSubscribe ms) + { + Consumer c = new Consumer(); + c._queueName = ms.getQueue(); + consumers.put(ms.getDestination(),c); + System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n"); + } + + @Override public void messageFlow(Session ssn,MessageFlow struct) + { + Consumer c = consumers.get(struct.getDestination()); + c._credit = struct.getValue(); + System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n"); + } + + @Override public void messageFlush(Session ssn,MessageFlush struct) + { + System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n"); + checkAndSendMessagesToConsumer(ssn,struct.getDestination()); + } + + @Override public void messageTransfer(Session ssn, MessageTransfer xfr) + { + String dest = xfr.getDestination(); + System.out.println("received transfer " + dest); + Header header = xfr.getHeader(); + DeliveryProperties props = header.get(DeliveryProperties.class); + if (props != null) + { + System.out.println("received headers routing_key " + props.getRoutingKey()); + } + + MessageProperties mp = header.get(MessageProperties.class); + System.out.println("MP: " + mp); + if (mp != null) + { + System.out.println(mp.getApplicationHeaders()); + } + + if (exchange.route(dest,props == null ? null : props.getRoutingKey(),xfr)) + { + System.out.println("queued " + xfr); + dispatchMessages(ssn); + } + else + { + + if (props == null || !props.getDiscardUnroutable()) + { + RangeSet ranges = new RangeSet(); + ranges.add(xfr.getId()); + ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE, + "no such destination"); + } + } + ssn.processed(xfr); + } + + private void transferMessageToPeer(Session ssn,String dest, MessageTransfer m) + { + System.out.println("\n==================> Transfering message to: " +dest + "\n"); + ssn.messageTransfer(m.getDestination(), MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, + m.getHeader(), m.getBody()); + } + + private void dispatchMessages(Session ssn) + { + for (String dest: consumers.keySet()) + { + checkAndSendMessagesToConsumer(ssn,dest); + } + } + + private void checkAndSendMessagesToConsumer(Session ssn,String dest) + { + Consumer c = consumers.get(dest); + LinkedBlockingQueue<MessageTransfer> queue = exchange.getQueue(c._queueName); + MessageTransfer m = queue.poll(); + while (m != null && c._credit>0) + { + transferMessageToPeer(ssn,dest,m); + c._credit--; + m = queue.poll(); + } + } + + // ugly, but who cares :) + // assumes unit is always no of messages, not bytes + // assumes it's credit mode and not window + private static class Consumer + { + long _credit; + String _queueName; + } + + private static final class ToyBrokerSession extends Session + { + + public ToyBrokerSession(Connection connection, Binary name, long expiry, ToyExchange exchange) + { + super(connection, new ToyBroker(exchange), name, expiry); + } + } + + public static final void main(String[] args) throws IOException + { + final ToyExchange exchange = new ToyExchange(); + ConnectionDelegate delegate = new ServerDelegate() + { + @Override + public void init(Connection conn, ProtocolHeader hdr) + { + conn.setSessionFactory(new Connection.SessionFactory() + { + public Session newSession(Connection conn, Binary name, long expiry) + { + return new ToyBrokerSession(conn, name, expiry, exchange); + } + }); + + super.init(conn, hdr); //To change body of overridden methods use File | Settings | File Templates. + } + + }; + + MinaHandler.accept("0.0.0.0", 5672, delegate); + } + +} |