From cdf98ebd5e19d5cee4a754dc319887cd9f55427e Mon Sep 17 00:00:00 2001 From: Eduardo Alonso Date: Wed, 27 Mar 2019 11:32:55 +0100 Subject: [PATCH] extensions: Switch to NIO sockets & cleanup * We're now using non-blocking sockets with extensions. This along with direct buffers should remove the lag suffered when using extensions. Signed-off-by: Eduardo Alonso --- .../java/gearth/extensions/Extension.java | 54 +++---- .../main/java/gearth/protocol/HPacket.java | 7 + .../java/gearth/ui/extensions/Extensions.java | 148 +++++++++--------- .../gearth/ui/extensions/GEarthExtension.java | 123 +++------------ .../extensions/GEarthExtensionsRegistrer.java | 102 ++++++++++-- 5 files changed, 219 insertions(+), 215 deletions(-) diff --git a/G-Earth/src/main/java/gearth/extensions/Extension.java b/G-Earth/src/main/java/gearth/extensions/Extension.java index ddf1331..eebc42f 100644 --- a/G-Earth/src/main/java/gearth/extensions/Extension.java +++ b/G-Earth/src/main/java/gearth/extensions/Extension.java @@ -5,7 +5,9 @@ import gearth.protocol.HPacket; import gearth.ui.extensions.Extensions; import java.io.*; -import java.net.Socket; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -23,8 +25,8 @@ public abstract class Extension implements IExtension{ void act(String[] args); } - protected boolean canLeave; // can you disconnect the ext - protected boolean canDelete; // can you delete the ext (will be false for some built-in extensions) + private boolean canLeave; // can you disconnect the ext + private boolean canDelete; // can you delete the ext (will be false for some built-in extensions) private String[] args; private boolean isCorrupted = false; @@ -32,7 +34,8 @@ public abstract class Extension implements IExtension{ private static final String[] FILE_FLAG = {"--filename", "-f"}; private static final String[] COOKIE_FLAG = {"--auth-token", "-c"}; // don't add a cookie or filename when debugging - private OutputStream out = null; + private SocketChannel gEarthExtensionServer = null; + private final Map> incomingMessageListeners = new HashMap<>(); private final Map> outgoingMessageListeners = new HashMap<>(); private FlagsCheckListener flagRequestCallback = null; @@ -87,37 +90,29 @@ public abstract class Extension implements IExtension{ String file = getArgument(args, FILE_FLAG); String cookie = getArgument(args, COOKIE_FLAG); - Socket gEarthExtensionServer = null; try { - gEarthExtensionServer = new Socket("127.0.0.1", port); - gEarthExtensionServer.setTcpNoDelay(true); - InputStream in = gEarthExtensionServer.getInputStream(); - DataInputStream dIn = new DataInputStream(in); - out = gEarthExtensionServer.getOutputStream(); + gEarthExtensionServer = SocketChannel.open(); + gEarthExtensionServer.connect(new InetSocketAddress("127.0.0.1", port)); + gEarthExtensionServer.socket().setTcpNoDelay(true); - while (!gEarthExtensionServer.isClosed()) { + ByteBuffer bbLength = ByteBuffer.allocateDirect(4); + while (gEarthExtensionServer.isOpen()) { int length; - try { - length = dIn.readInt(); - } - catch(EOFException exception) { - //g-earth closed the extension + + if (gEarthExtensionServer.read(bbLength) == -1) break; - } - byte[] headerandbody = new byte[length + 4]; + length = bbLength.getInt(0); + bbLength.flip(); - int amountRead = 0; + ByteBuffer headerAndBody = ByteBuffer.allocateDirect(4 + length); + headerAndBody.putInt(0); // this will be the length + gEarthExtensionServer.read(headerAndBody); - while (amountRead < length) { - amountRead += dIn.read(headerandbody, 4 + amountRead, Math.min(dIn.available(), length - amountRead)); - } - - HPacket packet = new HPacket(headerandbody); + HPacket packet = new HPacket(headerAndBody); packet.fixLength(); - if (packet.headerId() == Extensions.OUTGOING_MESSAGES_IDS.INFOREQUEST) { ExtensionInfo info = getInfoAnnotations(); @@ -132,7 +127,9 @@ public abstract class Extension implements IExtension{ .appendString(cookie == null ? "" : cookie) .appendBoolean(canLeave) .appendBoolean(canDelete); - writeToStream(response.toBytes()); + + ByteBuffer extInfo = ByteBuffer.wrap(response.toBytes()); + gEarthExtensionServer.write(extInfo); } else if (packet.headerId() == Extensions.OUTGOING_MESSAGES_IDS.CONNECTIONSTART) { String host = packet.readString(); @@ -204,7 +201,6 @@ public abstract class Extension implements IExtension{ response.appendLongString(habboMessage.stringify()); writeToStream(response.toBytes()); - } } @@ -213,7 +209,7 @@ public abstract class Extension implements IExtension{ // e.printStackTrace(); } finally { - if (gEarthExtensionServer != null && !gEarthExtensionServer.isClosed()) { + if (gEarthExtensionServer != null && gEarthExtensionServer.isOpen()) { try { gEarthExtensionServer.close(); } catch (IOException e) { @@ -225,7 +221,7 @@ public abstract class Extension implements IExtension{ private void writeToStream(byte[] bytes) throws IOException { synchronized (this) { - out.write(bytes); + gEarthExtensionServer.write(ByteBuffer.wrap(bytes)); } } diff --git a/G-Earth/src/main/java/gearth/protocol/HPacket.java b/G-Earth/src/main/java/gearth/protocol/HPacket.java index e970c30..31e220a 100644 --- a/G-Earth/src/main/java/gearth/protocol/HPacket.java +++ b/G-Earth/src/main/java/gearth/protocol/HPacket.java @@ -20,6 +20,13 @@ public class HPacket implements StringifyAble { public HPacket(byte[] packet) { packetInBytes = packet.clone(); } + + public HPacket(ByteBuffer packet) { + packetInBytes = new byte[packet.capacity()]; + for (int i = 0; i < packetInBytes.length; i++) + packetInBytes[i] = packet.get(i); + } + public HPacket(HPacket packet) { packetInBytes = packet.packetInBytes.clone(); isEdited = packet.isEdited; diff --git a/G-Earth/src/main/java/gearth/ui/extensions/Extensions.java b/G-Earth/src/main/java/gearth/ui/extensions/Extensions.java index 963bd1c..7208a99 100644 --- a/G-Earth/src/main/java/gearth/ui/extensions/Extensions.java +++ b/G-Earth/src/main/java/gearth/ui/extensions/Extensions.java @@ -36,7 +36,7 @@ import java.util.*; * Why? Because Habbo relies on the TCP protocol, which ENSURES that packets get received in the right order, so we will not be fucking that up. * That means that all packets following the packet you're manipulating in your extension will be blocked from being sent untill you're done. * TIP: If you're trying to replace a packet in your extension but you know it will take time, just block the packet, end the method, and let something asynchronous send - * the editted packet when you're done. + * the edited packet when you're done. * * * You may ignore everything beneath this line if you're extending the abstract Extension class we provide in Java. @@ -44,13 +44,13 @@ import java.util.*; * * (0. We recommend to use a cross-platform language for your extension) * - * 1. An extension will run as a seperate process on your device and has to be called with the flag "-p ", + * 1. An extension will run as a separate process on your device and has to be called with the flag "-p ", * where is a random port where the G-Earth local extension server will run on. Your extension has to connect with this server. * - * 2. G-Earth will open your program only ONCE, that is on the boot of G-Earth or when you install the exension. + * 2. G-Earth will open your program only ONCE, that is on the boot of G-Earth or when you install the extension. * Same story goes for closing the connection between the program and G-Earth, only once (on uninstall or close of G-Earth). * - * You may also run your extension completely seperate from G-Earth for debugging purpose for example, then it won't be installed in G-Earth + * You may also run your extension completely separate from G-Earth for debugging purpose for example, then it won't be installed in G-Earth * (but you have to configure the port yourself, which will be displayed in the extension page) * * 3. Once a connection is made, your extension will have to deal with the following incoming & outgoing messages as described (follows the same protocol structure as Habbo communication does): @@ -209,7 +209,6 @@ public class Extensions extends SubForm { } } } - } }; extension.addOnReceiveMessageListener(respondCallback); @@ -250,83 +249,80 @@ public class Extensions extends SubForm { HashMap messageListeners = new HashMap<>(); - try { - extensionsRegistrer = new GEarthExtensionsRegistrer(new GEarthExtensionsRegistrer.ExtensionRegisterObserver() { - @Override - public void onConnect(GEarthExtension extension) { - synchronized (gEarthExtensions) { - gEarthExtensions.add(extension); - } - - GEarthExtension.ReceiveMessageListener receiveMessageListener = message -> { - if (message.headerId() == INCOMING_MESSAGES_IDS.REQUESTFLAGS) { // no body - HPacket packet = new HPacket(OUTGOING_MESSAGES_IDS.FLAGSCHECK); - packet.appendInt(Main.args.length); - for (String arg : Main.args) { - packet.appendString(arg); - } - extension.sendMessage(packet); - } - else if (message.headerId() == INCOMING_MESSAGES_IDS.SENDMESSAGE) { - Byte side = message.readByte(); - int byteLength = message.readInteger(); - byte[] packetAsByteArray = message.readBytes(byteLength); - - HPacket packet = new HPacket(packetAsByteArray); - if (!packet.isCorrupted()) { - if (side == 0) { // toclient - getHConnection().sendToClientAsync(packet); - } - else if (side == 1) { // toserver - getHConnection().sendToServerAsync(packet); - } - } - } - }; - synchronized (messageListeners) { - messageListeners.put(extension, receiveMessageListener); - } - extension.addOnReceiveMessageListener(receiveMessageListener); - - extension.sendMessage(new HPacket(OUTGOING_MESSAGES_IDS.INIT)); - if (getHConnection().getState() == HConnection.State.CONNECTED) { - extension.sendMessage( - new HPacket(OUTGOING_MESSAGES_IDS.CONNECTIONSTART) - .appendString(getHConnection().getDomain()) - .appendInt(getHConnection().getPort()) - .appendString(getHConnection().getHotelVersion()) - .appendString(HarbleAPIFetcher.HARBLEAPI == null ? "null" : HarbleAPIFetcher.HARBLEAPI.getPath()) - ); - } - - extension.onRemoveClick(observable -> { - try { - extension.getConnection().close(); - } catch (IOException e) { - e.printStackTrace(); - } - }); - extension.onClick(observable -> extension.sendMessage(new HPacket(OUTGOING_MESSAGES_IDS.ONDOUBLECLICK))); - - Platform.runLater(() -> producer.extensionConnected(extension)); + extensionsRegistrer = new GEarthExtensionsRegistrer(new GEarthExtensionsRegistrer.ExtensionRegisterObserver() { + @Override + public void onConnect(GEarthExtension extension) { + synchronized (gEarthExtensions) { + gEarthExtensions.add(extension); } - @Override - public void onDisconnect(GEarthExtension extension) { - synchronized (gEarthExtensions) { - gEarthExtensions.remove(extension); + GEarthExtension.ReceiveMessageListener receiveMessageListener = message -> { + if (message.headerId() == INCOMING_MESSAGES_IDS.REQUESTFLAGS) { // no body + HPacket packet = new HPacket(OUTGOING_MESSAGES_IDS.FLAGSCHECK); + packet.appendInt(Main.args.length); + for (String arg : Main.args) { + packet.appendString(arg); + } + extension.sendMessage(packet); } + else if (message.headerId() == INCOMING_MESSAGES_IDS.SENDMESSAGE) { + Byte side = message.readByte(); + int byteLength = message.readInteger(); + byte[] packetAsByteArray = message.readBytes(byteLength); - synchronized (messageListeners) { - extension.removeOnReceiveMessageListener(messageListeners.get(extension)); - messageListeners.remove(extension); + HPacket packet = new HPacket(packetAsByteArray); + if (!packet.isCorrupted()) { + if (side == 0) { // toclient + getHConnection().sendToClientAsync(packet); + } + else if (side == 1) { // toserver + getHConnection().sendToServerAsync(packet); + } + } } - Platform.runLater(extension::delete); + }; + synchronized (messageListeners) { + messageListeners.put(extension, receiveMessageListener); } - }); - } catch (IOException e) { - e.printStackTrace(); - } + extension.addOnReceiveMessageListener(receiveMessageListener); + + extension.sendMessage(new HPacket(OUTGOING_MESSAGES_IDS.INIT)); + if (getHConnection().getState() == HConnection.State.CONNECTED) { + extension.sendMessage( + new HPacket(OUTGOING_MESSAGES_IDS.CONNECTIONSTART) + .appendString(getHConnection().getDomain()) + .appendInt(getHConnection().getPort()) + .appendString(getHConnection().getHotelVersion()) + .appendString(HarbleAPIFetcher.HARBLEAPI == null ? "null" : HarbleAPIFetcher.HARBLEAPI.getPath()) + ); + } + + extension.onRemoveClick(observable -> { + try { + extension.getConnection().close(); + extension.disconnect(); + } catch (IOException e) { + e.printStackTrace(); + } + }); + extension.onClick(observable -> extension.sendMessage(new HPacket(OUTGOING_MESSAGES_IDS.ONDOUBLECLICK))); + + Platform.runLater(() -> producer.extensionConnected(extension)); + } + + @Override + public void onDisconnect(GEarthExtension extension) { + synchronized (gEarthExtensions) { + gEarthExtensions.remove(extension); + } + + synchronized (messageListeners) { + extension.removeOnReceiveMessageListener(messageListeners.get(extension)); + messageListeners.remove(extension); + } + Platform.runLater(extension::delete); + } + }); producer.setPort(extensionsRegistrer.getPort()); ext_port.setText(extensionsRegistrer.getPort()+""); diff --git a/G-Earth/src/main/java/gearth/ui/extensions/GEarthExtension.java b/G-Earth/src/main/java/gearth/ui/extensions/GEarthExtension.java index 1d0d7a8..6d8b964 100644 --- a/G-Earth/src/main/java/gearth/ui/extensions/GEarthExtension.java +++ b/G-Earth/src/main/java/gearth/ui/extensions/GEarthExtension.java @@ -3,11 +3,10 @@ package gearth.ui.extensions; import javafx.beans.InvalidationListener; import gearth.protocol.HPacket; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import gearth.ui.extensions.authentication.Authenticator; -import java.net.Socket; +import java.io.*; + +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; @@ -29,57 +28,12 @@ public class GEarthExtension { private String fileName; private String cookie; - private Socket connection; + private OnDisconnectedCallback disconnectedCallback; - //calls callback when the extension is creatd - static void create(Socket connection, OnCreatedCallback callback, OnDisconnectedCallback onDisconnectedCallback) { + private SocketChannel connection; - new Thread(() -> { - try { - synchronized (connection) { - connection.getOutputStream().write((new HPacket(Extensions.OUTGOING_MESSAGES_IDS.INFOREQUEST)).toBytes()); - } - - InputStream inputStream = connection.getInputStream(); - DataInputStream dIn = new DataInputStream(inputStream); - - while (!connection.isClosed()) { - - int length = dIn.readInt(); - byte[] headerandbody = new byte[length + 4]; - - int amountRead = 0; - while (amountRead < length) { - amountRead += dIn.read(headerandbody, 4 + amountRead, Math.min(dIn.available(), length - amountRead)); - } - - HPacket packet = new HPacket(headerandbody); - packet.fixLength(); - - if (packet.headerId() == Extensions.INCOMING_MESSAGES_IDS.EXTENSIONINFO) { - GEarthExtension gEarthExtension = new GEarthExtension( - packet, - connection, - onDisconnectedCallback - ); - - if (Authenticator.evaluate(gEarthExtension)) { - callback.act(gEarthExtension); - } - else { - gEarthExtension.closeConnection(); //you shall not pass... - } - - break; - } - } - - } catch (IOException ignored) {} - }).start(); - - } - - private GEarthExtension(HPacket extensionInfo, Socket connection, OnDisconnectedCallback onDisconnectedCallback) { + //calls callback when the extension is created + GEarthExtension(HPacket extensionInfo, SocketChannel connection, OnDisconnectedCallback onDisconnectedCallback) { this.title = extensionInfo.readString(); this.author = extensionInfo.readString(); this.version = extensionInfo.readString(); @@ -94,52 +48,23 @@ public class GEarthExtension { this.deleteButtonVisible = extensionInfo.readBoolean(); this.connection = connection; - - GEarthExtension selff = this; - new Thread(() -> { - try { - InputStream inputStream = connection.getInputStream(); - DataInputStream dIn = new DataInputStream(inputStream); - - while (!connection.isClosed()) { - int length = dIn.readInt(); - byte[] headerandbody = new byte[length + 4]; - - int amountRead = 0; - while (amountRead < length) { - amountRead += dIn.read(headerandbody, 4 + amountRead, Math.min(dIn.available(), length - amountRead)); - } - - HPacket packet = new HPacket(headerandbody); - packet.fixLength(); - - synchronized (receiveMessageListeners) { - for (int i = receiveMessageListeners.size() - 1; i >= 0; i--) { - receiveMessageListeners.get(i).act(packet); - packet.setReadIndex(6); - } - } - - } - - } catch (IOException e) { - // An extension disconnected, which is OK - } finally { - onDisconnectedCallback.act(selff); - if (!connection.isClosed()) { - try { - connection.close(); - } catch (IOException e) { -// e.printStackTrace(); - } - } - } - }).start(); - - + this.disconnectedCallback = onDisconnectedCallback; } - public Socket getConnection() { + public void act(HPacket packet) { + synchronized (receiveMessageListeners) { + for (int i = receiveMessageListeners.size() - 1; i >= 0; i--) { + receiveMessageListeners.get(i).act(packet); + packet.setReadIndex(6); + } + } + } + + void disconnect() { + disconnectedCallback.act(this); + } + + public SocketChannel getConnection() { return connection; } @@ -187,7 +112,7 @@ public class GEarthExtension { public boolean sendMessage(HPacket message) { try { synchronized (this) { - connection.getOutputStream().write(message.toBytes()); + connection.write(ByteBuffer.wrap(message.toBytes())); } return true; } catch (IOException e) { diff --git a/G-Earth/src/main/java/gearth/ui/extensions/GEarthExtensionsRegistrer.java b/G-Earth/src/main/java/gearth/ui/extensions/GEarthExtensionsRegistrer.java index 083c181..e8993cc 100644 --- a/G-Earth/src/main/java/gearth/ui/extensions/GEarthExtensionsRegistrer.java +++ b/G-Earth/src/main/java/gearth/ui/extensions/GEarthExtensionsRegistrer.java @@ -1,19 +1,28 @@ package gearth.ui.extensions; +import gearth.protocol.HPacket; +import gearth.ui.extensions.authentication.Authenticator; + import java.io.IOException; -import java.net.ServerSocket; -import java.net.Socket; +import java.net.InetSocketAddress; + +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Iterator; /** * Created by Jonas on 21/06/18. */ public class GEarthExtensionsRegistrer { - private ServerSocket serverSocket; + private ServerSocketChannel serverSocket; + private Selector selector; - GEarthExtensionsRegistrer(ExtensionRegisterObserver observer) throws IOException { + GEarthExtensionsRegistrer(ExtensionRegisterObserver observer) { -// serverSocket = new ServerSocket(0); int port = 9092; boolean serverSetup = false; while (!serverSetup) { @@ -21,20 +30,86 @@ public class GEarthExtensionsRegistrer { port++; } - new Thread(() -> { try { - while (!serverSocket.isClosed()) { - Socket extensionSocket = serverSocket.accept(); - GEarthExtension.create(extensionSocket, observer::onConnect, observer::onDisconnect); + ByteBuffer lenBuf = ByteBuffer.allocateDirect(4); + while (selector.isOpen()) { + selector.select(); + + Iterator keys = selector.selectedKeys().iterator(); + while (keys.hasNext()) { + SelectionKey key = (SelectionKey) keys.next(); + keys.remove(); + + if (!key.isValid()) continue; + + if (key.isAcceptable()) + accept(key); + else if (key.isReadable()) { + SocketChannel channel = (SocketChannel) key.channel(); + if (!channel.isOpen()) + continue; + + if(channel.read(lenBuf) == -1) { + channel.close(); + continue; + } + + ByteBuffer headerAndBody = ByteBuffer.allocateDirect(4 + lenBuf.getInt(0)); + lenBuf.flip(); + headerAndBody.putInt(0);// this will be the length + + channel.read(headerAndBody); + + HPacket packet = new HPacket(headerAndBody); + packet.fixLength(); + + if (packet.headerId() == Extensions.INCOMING_MESSAGES_IDS.EXTENSIONINFO) { + GEarthExtension gEarthExtension = new GEarthExtension( + packet, + channel, + observer::onDisconnect + ); + key.attach(gEarthExtension); + + if (Authenticator.evaluate(gEarthExtension)) { + observer.onConnect(gEarthExtension); + } else { + gEarthExtension.closeConnection(); //you shall not pass... + } + } else if (key.attachment() != null) + ((GEarthExtension)key.attachment()).act(packet); + } + } } } catch (IOException e) {e.printStackTrace();} }).start(); } + private void accept(SelectionKey key) { + ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); + try { + SocketChannel channel = serverChannel.accept(); + channel.configureBlocking(false); + channel.register(selector, SelectionKey.OP_READ); + + + ByteBuffer arr = ByteBuffer.wrap((new HPacket(Extensions.OUTGOING_MESSAGES_IDS.INFOREQUEST)).toBytes()); + channel.write(arr); + + } catch (IOException e) { + e.printStackTrace(); + } + } + + private boolean createServer(int port) { try { - serverSocket = new ServerSocket(port); + serverSocket = ServerSocketChannel.open(); + serverSocket.bind(new InetSocketAddress(port)); + selector = Selector.open(); + serverSocket.configureBlocking(false); + serverSocket.register(selector, SelectionKey.OP_ACCEPT); return true; } catch (IOException e) { return false; @@ -42,7 +117,12 @@ public class GEarthExtensionsRegistrer { } public int getPort() { - return serverSocket.getLocalPort(); + try { + return ((InetSocketAddress) serverSocket.getLocalAddress()).getPort(); + } catch (IOException e) { + e.printStackTrace(); + return 0; + } } public interface ExtensionRegisterObserver {