package org.openlcb.hub; import java.net.*; import java.io.*; import java.util.*; import java.util.concurrent.*; /** * Simple multi-threaded OpenLCB hub implementation. *

* Multiple connections send lines terminated by newline, * each of which is echoed to all other connections. * *

* main() directly invokes an object of the class. *

* Current threading model does all the sending from a * a single thread. If this is observed to back up & * halt all flow, individual transmit queues and threads * may be needed. * * @author Bob Jacobsen Copyright 2012 * @version $Revision: 17977 $ */ public class Hub { public final static int DEFAULT_PORT = 12021; final static int CAPACITY = 20; // not too long, to reduce delay public Hub() { this(Hub.DEFAULT_PORT); } public Hub(int port) { this.port = port; // create array server thread Thread t = new Thread() { public void run() { while (true) { try { // as items arrive in queue, forward to every available connection Memo m = queue.take(); for ( Forwarding e : threads) { e.forward(m); } } catch (InterruptedException e) { System.err.println("Hub: Interrupted in queue handling loop"); System.err.println(e); } } } }; t.setDaemon(true); t.start(); } ArrayBlockingQueue queue = new ArrayBlockingQueue(CAPACITY, true); //fairness ArrayList threads = new ArrayList(); int port; ServerSocket service; public void start() { try { service = new ServerSocket(port); while (true) { Socket clientSocket = service.accept(); ReaderThread r = new ReaderThread(clientSocket); addForwarder(r); r.start(); // not setting Daemon, so program will wait for thread to end before terminating notifyOwner("Connection started with "+getRemoteSocketAddress(clientSocket)); } } catch (IOException e) { System.err.println("Hub: Exception in main loop"); System.err.println(e); } } public int getPort() { return port; } public void addForwarder(Forwarding f) { threads.add(f); } public void notifyOwner(String line) { System.out.println(line); } // from jmri.util.SocketUtil String getRemoteSocketAddress(Socket socket) { try { return socket.getRemoteSocketAddress().toString(); } catch (Throwable e) { } finally { // return ""; } return ""; } public void putLine(String line) { try { queue.put(new Memo(line, null)); } catch (InterruptedException e) { System.err.println(e); } } public interface Forwarding { public void forward(Memo m); } class ReaderThread extends Thread implements Forwarding { ReaderThread(Socket clientSocket) { this.clientSocket = clientSocket; } Socket clientSocket; DataInputStream input; PrintStream output; public void run() { try { input = new DataInputStream(clientSocket.getInputStream()); output = new PrintStream(clientSocket.getOutputStream()); while (true) { String line = input.readLine(); if (line == null) break; // socket ended queue.put(new Memo(line, this)); } } catch (IOException e) { System.err.println("Hub: Error while handling input from "+getRemoteSocketAddress(clientSocket)); System.err.println(e); } catch (InterruptedException e) { System.err.println("Hub: Interrupted while handling input from "+getRemoteSocketAddress(clientSocket)); System.err.println(e); } threads.remove(this); notifyOwner("Connection ended with "+getRemoteSocketAddress(clientSocket)); try { clientSocket.close(); } catch (IOException e) { System.err.println("Hub: Error while closing socket at end of connection"); System.err.println(e); } } public void forward(Memo m) { if (! this.equals(m.source)) { output.println(m.line); } } } public class Memo { public String line; public Forwarding source; Memo(String line, Forwarding source) { this.line = line; this.source = source; } } static public void main(String[] args) { Hub h = new Hub(); h.start(); } }