extensions: Switch to NIO sockets & cleanup

* We're now using non-blocking sockets with extensions. This along with direct buffers should remove the lag suffered when using extensions.

Signed-off-by: Eduardo Alonso <edu@error404software.com>
This commit is contained in:
Eduardo Alonso 2019-03-27 11:32:55 +01:00
parent 8fe3f8f7ec
commit cdf98ebd5e
5 changed files with 219 additions and 215 deletions

View File

@ -5,7 +5,9 @@ import gearth.protocol.HPacket;
import gearth.ui.extensions.Extensions; import gearth.ui.extensions.Extensions;
import java.io.*; import java.io.*;
import java.net.Socket; import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -23,8 +25,8 @@ public abstract class Extension implements IExtension{
void act(String[] args); void act(String[] args);
} }
protected boolean canLeave; // can you disconnect the ext private boolean canLeave; // can you disconnect the ext
protected boolean canDelete; // can you delete the ext (will be false for some built-in extensions) private boolean canDelete; // can you delete the ext (will be false for some built-in extensions)
private String[] args; private String[] args;
private boolean isCorrupted = false; private boolean isCorrupted = false;
@ -32,7 +34,8 @@ public abstract class Extension implements IExtension{
private static final String[] FILE_FLAG = {"--filename", "-f"}; private static final String[] FILE_FLAG = {"--filename", "-f"};
private static final String[] COOKIE_FLAG = {"--auth-token", "-c"}; // don't add a cookie or filename when debugging private static final String[] COOKIE_FLAG = {"--auth-token", "-c"}; // don't add a cookie or filename when debugging
private OutputStream out = null; private SocketChannel gEarthExtensionServer = null;
private final Map<Integer, List<MessageListener>> incomingMessageListeners = new HashMap<>(); private final Map<Integer, List<MessageListener>> incomingMessageListeners = new HashMap<>();
private final Map<Integer, List<MessageListener>> outgoingMessageListeners = new HashMap<>(); private final Map<Integer, List<MessageListener>> outgoingMessageListeners = new HashMap<>();
private FlagsCheckListener flagRequestCallback = null; private FlagsCheckListener flagRequestCallback = null;
@ -87,37 +90,29 @@ public abstract class Extension implements IExtension{
String file = getArgument(args, FILE_FLAG); String file = getArgument(args, FILE_FLAG);
String cookie = getArgument(args, COOKIE_FLAG); String cookie = getArgument(args, COOKIE_FLAG);
Socket gEarthExtensionServer = null;
try { try {
gEarthExtensionServer = new Socket("127.0.0.1", port); gEarthExtensionServer = SocketChannel.open();
gEarthExtensionServer.setTcpNoDelay(true); gEarthExtensionServer.connect(new InetSocketAddress("127.0.0.1", port));
InputStream in = gEarthExtensionServer.getInputStream(); gEarthExtensionServer.socket().setTcpNoDelay(true);
DataInputStream dIn = new DataInputStream(in);
out = gEarthExtensionServer.getOutputStream();
while (!gEarthExtensionServer.isClosed()) { ByteBuffer bbLength = ByteBuffer.allocateDirect(4);
while (gEarthExtensionServer.isOpen()) {
int length; int length;
try {
length = dIn.readInt(); if (gEarthExtensionServer.read(bbLength) == -1)
}
catch(EOFException exception) {
//g-earth closed the extension
break; break;
}
byte[] headerandbody = new byte[length + 4]; length = bbLength.getInt(0);
bbLength.flip();
int amountRead = 0; ByteBuffer headerAndBody = ByteBuffer.allocateDirect(4 + length);
headerAndBody.putInt(0); // this will be the length
gEarthExtensionServer.read(headerAndBody);
while (amountRead < length) { HPacket packet = new HPacket(headerAndBody);
amountRead += dIn.read(headerandbody, 4 + amountRead, Math.min(dIn.available(), length - amountRead));
}
HPacket packet = new HPacket(headerandbody);
packet.fixLength(); packet.fixLength();
if (packet.headerId() == Extensions.OUTGOING_MESSAGES_IDS.INFOREQUEST) { if (packet.headerId() == Extensions.OUTGOING_MESSAGES_IDS.INFOREQUEST) {
ExtensionInfo info = getInfoAnnotations(); ExtensionInfo info = getInfoAnnotations();
@ -132,7 +127,9 @@ public abstract class Extension implements IExtension{
.appendString(cookie == null ? "" : cookie) .appendString(cookie == null ? "" : cookie)
.appendBoolean(canLeave) .appendBoolean(canLeave)
.appendBoolean(canDelete); .appendBoolean(canDelete);
writeToStream(response.toBytes());
ByteBuffer extInfo = ByteBuffer.wrap(response.toBytes());
gEarthExtensionServer.write(extInfo);
} }
else if (packet.headerId() == Extensions.OUTGOING_MESSAGES_IDS.CONNECTIONSTART) { else if (packet.headerId() == Extensions.OUTGOING_MESSAGES_IDS.CONNECTIONSTART) {
String host = packet.readString(); String host = packet.readString();
@ -204,7 +201,6 @@ public abstract class Extension implements IExtension{
response.appendLongString(habboMessage.stringify()); response.appendLongString(habboMessage.stringify());
writeToStream(response.toBytes()); writeToStream(response.toBytes());
} }
} }
@ -213,7 +209,7 @@ public abstract class Extension implements IExtension{
// e.printStackTrace(); // e.printStackTrace();
} }
finally { finally {
if (gEarthExtensionServer != null && !gEarthExtensionServer.isClosed()) { if (gEarthExtensionServer != null && gEarthExtensionServer.isOpen()) {
try { try {
gEarthExtensionServer.close(); gEarthExtensionServer.close();
} catch (IOException e) { } catch (IOException e) {
@ -225,7 +221,7 @@ public abstract class Extension implements IExtension{
private void writeToStream(byte[] bytes) throws IOException { private void writeToStream(byte[] bytes) throws IOException {
synchronized (this) { synchronized (this) {
out.write(bytes); gEarthExtensionServer.write(ByteBuffer.wrap(bytes));
} }
} }

View File

@ -20,6 +20,13 @@ public class HPacket implements StringifyAble {
public HPacket(byte[] packet) { public HPacket(byte[] packet) {
packetInBytes = packet.clone(); packetInBytes = packet.clone();
} }
public HPacket(ByteBuffer packet) {
packetInBytes = new byte[packet.capacity()];
for (int i = 0; i < packetInBytes.length; i++)
packetInBytes[i] = packet.get(i);
}
public HPacket(HPacket packet) { public HPacket(HPacket packet) {
packetInBytes = packet.packetInBytes.clone(); packetInBytes = packet.packetInBytes.clone();
isEdited = packet.isEdited; isEdited = packet.isEdited;

View File

@ -36,7 +36,7 @@ import java.util.*;
* Why? Because Habbo relies on the TCP protocol, which ENSURES that packets get received in the right order, so we will not be fucking that up. * Why? Because Habbo relies on the TCP protocol, which ENSURES that packets get received in the right order, so we will not be fucking that up.
* That means that all packets following the packet you're manipulating in your extension will be blocked from being sent untill you're done. * That means that all packets following the packet you're manipulating in your extension will be blocked from being sent untill you're done.
* TIP: If you're trying to replace a packet in your extension but you know it will take time, just block the packet, end the method, and let something asynchronous send * TIP: If you're trying to replace a packet in your extension but you know it will take time, just block the packet, end the method, and let something asynchronous send
* the editted packet when you're done. * the edited packet when you're done.
* *
* *
* You may ignore everything beneath this line if you're extending the abstract Extension class we provide in Java. * You may ignore everything beneath this line if you're extending the abstract Extension class we provide in Java.
@ -44,13 +44,13 @@ import java.util.*;
* *
* (0. We recommend to use a cross-platform language for your extension) * (0. We recommend to use a cross-platform language for your extension)
* *
* 1. An extension will run as a seperate process on your device and has to be called with the flag "-p <PORT>", * 1. An extension will run as a separate process on your device and has to be called with the flag "-p <PORT>",
* where <PORT> is a random port where the G-Earth local extension server will run on. Your extension has to connect with this server. * where <PORT> is a random port where the G-Earth local extension server will run on. Your extension has to connect with this server.
* *
* 2. G-Earth will open your program only ONCE, that is on the boot of G-Earth or when you install the exension. * 2. G-Earth will open your program only ONCE, that is on the boot of G-Earth or when you install the extension.
* Same story goes for closing the connection between the program and G-Earth, only once (on uninstall or close of G-Earth). * Same story goes for closing the connection between the program and G-Earth, only once (on uninstall or close of G-Earth).
* *
* You may also run your extension completely seperate from G-Earth for debugging purpose for example, then it won't be installed in G-Earth * You may also run your extension completely separate from G-Earth for debugging purpose for example, then it won't be installed in G-Earth
* (but you have to configure the port yourself, which will be displayed in the extension page) * (but you have to configure the port yourself, which will be displayed in the extension page)
* *
* 3. Once a connection is made, your extension will have to deal with the following incoming & outgoing messages as described (follows the same protocol structure as Habbo communication does): * 3. Once a connection is made, your extension will have to deal with the following incoming & outgoing messages as described (follows the same protocol structure as Habbo communication does):
@ -209,7 +209,6 @@ public class Extensions extends SubForm {
} }
} }
} }
} }
}; };
extension.addOnReceiveMessageListener(respondCallback); extension.addOnReceiveMessageListener(respondCallback);
@ -250,83 +249,80 @@ public class Extensions extends SubForm {
HashMap<GEarthExtension, GEarthExtension.ReceiveMessageListener> messageListeners = new HashMap<>(); HashMap<GEarthExtension, GEarthExtension.ReceiveMessageListener> messageListeners = new HashMap<>();
try { extensionsRegistrer = new GEarthExtensionsRegistrer(new GEarthExtensionsRegistrer.ExtensionRegisterObserver() {
extensionsRegistrer = new GEarthExtensionsRegistrer(new GEarthExtensionsRegistrer.ExtensionRegisterObserver() { @Override
@Override public void onConnect(GEarthExtension extension) {
public void onConnect(GEarthExtension extension) { synchronized (gEarthExtensions) {
synchronized (gEarthExtensions) { gEarthExtensions.add(extension);
gEarthExtensions.add(extension);
}
GEarthExtension.ReceiveMessageListener receiveMessageListener = message -> {
if (message.headerId() == INCOMING_MESSAGES_IDS.REQUESTFLAGS) { // no body
HPacket packet = new HPacket(OUTGOING_MESSAGES_IDS.FLAGSCHECK);
packet.appendInt(Main.args.length);
for (String arg : Main.args) {
packet.appendString(arg);
}
extension.sendMessage(packet);
}
else if (message.headerId() == INCOMING_MESSAGES_IDS.SENDMESSAGE) {
Byte side = message.readByte();
int byteLength = message.readInteger();
byte[] packetAsByteArray = message.readBytes(byteLength);
HPacket packet = new HPacket(packetAsByteArray);
if (!packet.isCorrupted()) {
if (side == 0) { // toclient
getHConnection().sendToClientAsync(packet);
}
else if (side == 1) { // toserver
getHConnection().sendToServerAsync(packet);
}
}
}
};
synchronized (messageListeners) {
messageListeners.put(extension, receiveMessageListener);
}
extension.addOnReceiveMessageListener(receiveMessageListener);
extension.sendMessage(new HPacket(OUTGOING_MESSAGES_IDS.INIT));
if (getHConnection().getState() == HConnection.State.CONNECTED) {
extension.sendMessage(
new HPacket(OUTGOING_MESSAGES_IDS.CONNECTIONSTART)
.appendString(getHConnection().getDomain())
.appendInt(getHConnection().getPort())
.appendString(getHConnection().getHotelVersion())
.appendString(HarbleAPIFetcher.HARBLEAPI == null ? "null" : HarbleAPIFetcher.HARBLEAPI.getPath())
);
}
extension.onRemoveClick(observable -> {
try {
extension.getConnection().close();
} catch (IOException e) {
e.printStackTrace();
}
});
extension.onClick(observable -> extension.sendMessage(new HPacket(OUTGOING_MESSAGES_IDS.ONDOUBLECLICK)));
Platform.runLater(() -> producer.extensionConnected(extension));
} }
@Override GEarthExtension.ReceiveMessageListener receiveMessageListener = message -> {
public void onDisconnect(GEarthExtension extension) { if (message.headerId() == INCOMING_MESSAGES_IDS.REQUESTFLAGS) { // no body
synchronized (gEarthExtensions) { HPacket packet = new HPacket(OUTGOING_MESSAGES_IDS.FLAGSCHECK);
gEarthExtensions.remove(extension); packet.appendInt(Main.args.length);
for (String arg : Main.args) {
packet.appendString(arg);
}
extension.sendMessage(packet);
} }
else if (message.headerId() == INCOMING_MESSAGES_IDS.SENDMESSAGE) {
Byte side = message.readByte();
int byteLength = message.readInteger();
byte[] packetAsByteArray = message.readBytes(byteLength);
synchronized (messageListeners) { HPacket packet = new HPacket(packetAsByteArray);
extension.removeOnReceiveMessageListener(messageListeners.get(extension)); if (!packet.isCorrupted()) {
messageListeners.remove(extension); if (side == 0) { // toclient
getHConnection().sendToClientAsync(packet);
}
else if (side == 1) { // toserver
getHConnection().sendToServerAsync(packet);
}
}
} }
Platform.runLater(extension::delete); };
synchronized (messageListeners) {
messageListeners.put(extension, receiveMessageListener);
} }
}); extension.addOnReceiveMessageListener(receiveMessageListener);
} catch (IOException e) {
e.printStackTrace(); extension.sendMessage(new HPacket(OUTGOING_MESSAGES_IDS.INIT));
} if (getHConnection().getState() == HConnection.State.CONNECTED) {
extension.sendMessage(
new HPacket(OUTGOING_MESSAGES_IDS.CONNECTIONSTART)
.appendString(getHConnection().getDomain())
.appendInt(getHConnection().getPort())
.appendString(getHConnection().getHotelVersion())
.appendString(HarbleAPIFetcher.HARBLEAPI == null ? "null" : HarbleAPIFetcher.HARBLEAPI.getPath())
);
}
extension.onRemoveClick(observable -> {
try {
extension.getConnection().close();
extension.disconnect();
} catch (IOException e) {
e.printStackTrace();
}
});
extension.onClick(observable -> extension.sendMessage(new HPacket(OUTGOING_MESSAGES_IDS.ONDOUBLECLICK)));
Platform.runLater(() -> producer.extensionConnected(extension));
}
@Override
public void onDisconnect(GEarthExtension extension) {
synchronized (gEarthExtensions) {
gEarthExtensions.remove(extension);
}
synchronized (messageListeners) {
extension.removeOnReceiveMessageListener(messageListeners.get(extension));
messageListeners.remove(extension);
}
Platform.runLater(extension::delete);
}
});
producer.setPort(extensionsRegistrer.getPort()); producer.setPort(extensionsRegistrer.getPort());
ext_port.setText(extensionsRegistrer.getPort()+""); ext_port.setText(extensionsRegistrer.getPort()+"");

View File

@ -3,11 +3,10 @@ package gearth.ui.extensions;
import javafx.beans.InvalidationListener; import javafx.beans.InvalidationListener;
import gearth.protocol.HPacket; import gearth.protocol.HPacket;
import java.io.DataInputStream; import java.io.*;
import java.io.IOException;
import java.io.InputStream; import java.nio.ByteBuffer;
import gearth.ui.extensions.authentication.Authenticator; import java.nio.channels.SocketChannel;
import java.net.Socket;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -29,57 +28,12 @@ public class GEarthExtension {
private String fileName; private String fileName;
private String cookie; private String cookie;
private Socket connection; private OnDisconnectedCallback disconnectedCallback;
//calls callback when the extension is creatd private SocketChannel connection;
static void create(Socket connection, OnCreatedCallback callback, OnDisconnectedCallback onDisconnectedCallback) {
new Thread(() -> { //calls callback when the extension is created
try { GEarthExtension(HPacket extensionInfo, SocketChannel connection, OnDisconnectedCallback onDisconnectedCallback) {
synchronized (connection) {
connection.getOutputStream().write((new HPacket(Extensions.OUTGOING_MESSAGES_IDS.INFOREQUEST)).toBytes());
}
InputStream inputStream = connection.getInputStream();
DataInputStream dIn = new DataInputStream(inputStream);
while (!connection.isClosed()) {
int length = dIn.readInt();
byte[] headerandbody = new byte[length + 4];
int amountRead = 0;
while (amountRead < length) {
amountRead += dIn.read(headerandbody, 4 + amountRead, Math.min(dIn.available(), length - amountRead));
}
HPacket packet = new HPacket(headerandbody);
packet.fixLength();
if (packet.headerId() == Extensions.INCOMING_MESSAGES_IDS.EXTENSIONINFO) {
GEarthExtension gEarthExtension = new GEarthExtension(
packet,
connection,
onDisconnectedCallback
);
if (Authenticator.evaluate(gEarthExtension)) {
callback.act(gEarthExtension);
}
else {
gEarthExtension.closeConnection(); //you shall not pass...
}
break;
}
}
} catch (IOException ignored) {}
}).start();
}
private GEarthExtension(HPacket extensionInfo, Socket connection, OnDisconnectedCallback onDisconnectedCallback) {
this.title = extensionInfo.readString(); this.title = extensionInfo.readString();
this.author = extensionInfo.readString(); this.author = extensionInfo.readString();
this.version = extensionInfo.readString(); this.version = extensionInfo.readString();
@ -94,52 +48,23 @@ public class GEarthExtension {
this.deleteButtonVisible = extensionInfo.readBoolean(); this.deleteButtonVisible = extensionInfo.readBoolean();
this.connection = connection; this.connection = connection;
this.disconnectedCallback = onDisconnectedCallback;
GEarthExtension selff = this;
new Thread(() -> {
try {
InputStream inputStream = connection.getInputStream();
DataInputStream dIn = new DataInputStream(inputStream);
while (!connection.isClosed()) {
int length = dIn.readInt();
byte[] headerandbody = new byte[length + 4];
int amountRead = 0;
while (amountRead < length) {
amountRead += dIn.read(headerandbody, 4 + amountRead, Math.min(dIn.available(), length - amountRead));
}
HPacket packet = new HPacket(headerandbody);
packet.fixLength();
synchronized (receiveMessageListeners) {
for (int i = receiveMessageListeners.size() - 1; i >= 0; i--) {
receiveMessageListeners.get(i).act(packet);
packet.setReadIndex(6);
}
}
}
} catch (IOException e) {
// An extension disconnected, which is OK
} finally {
onDisconnectedCallback.act(selff);
if (!connection.isClosed()) {
try {
connection.close();
} catch (IOException e) {
// e.printStackTrace();
}
}
}
}).start();
} }
public Socket getConnection() { public void act(HPacket packet) {
synchronized (receiveMessageListeners) {
for (int i = receiveMessageListeners.size() - 1; i >= 0; i--) {
receiveMessageListeners.get(i).act(packet);
packet.setReadIndex(6);
}
}
}
void disconnect() {
disconnectedCallback.act(this);
}
public SocketChannel getConnection() {
return connection; return connection;
} }
@ -187,7 +112,7 @@ public class GEarthExtension {
public boolean sendMessage(HPacket message) { public boolean sendMessage(HPacket message) {
try { try {
synchronized (this) { synchronized (this) {
connection.getOutputStream().write(message.toBytes()); connection.write(ByteBuffer.wrap(message.toBytes()));
} }
return true; return true;
} catch (IOException e) { } catch (IOException e) {

View File

@ -1,19 +1,28 @@
package gearth.ui.extensions; package gearth.ui.extensions;
import gearth.protocol.HPacket;
import gearth.ui.extensions.authentication.Authenticator;
import java.io.IOException; import java.io.IOException;
import java.net.ServerSocket; import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/** /**
* Created by Jonas on 21/06/18. * Created by Jonas on 21/06/18.
*/ */
public class GEarthExtensionsRegistrer { public class GEarthExtensionsRegistrer {
private ServerSocket serverSocket; private ServerSocketChannel serverSocket;
private Selector selector;
GEarthExtensionsRegistrer(ExtensionRegisterObserver observer) throws IOException { GEarthExtensionsRegistrer(ExtensionRegisterObserver observer) {
// serverSocket = new ServerSocket(0);
int port = 9092; int port = 9092;
boolean serverSetup = false; boolean serverSetup = false;
while (!serverSetup) { while (!serverSetup) {
@ -21,20 +30,86 @@ public class GEarthExtensionsRegistrer {
port++; port++;
} }
new Thread(() -> { new Thread(() -> {
try { try {
while (!serverSocket.isClosed()) { ByteBuffer lenBuf = ByteBuffer.allocateDirect(4);
Socket extensionSocket = serverSocket.accept(); while (selector.isOpen()) {
GEarthExtension.create(extensionSocket, observer::onConnect, observer::onDisconnect); selector.select();
Iterator keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = (SelectionKey) keys.next();
keys.remove();
if (!key.isValid()) continue;
if (key.isAcceptable())
accept(key);
else if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
if (!channel.isOpen())
continue;
if(channel.read(lenBuf) == -1) {
channel.close();
continue;
}
ByteBuffer headerAndBody = ByteBuffer.allocateDirect(4 + lenBuf.getInt(0));
lenBuf.flip();
headerAndBody.putInt(0);// this will be the length
channel.read(headerAndBody);
HPacket packet = new HPacket(headerAndBody);
packet.fixLength();
if (packet.headerId() == Extensions.INCOMING_MESSAGES_IDS.EXTENSIONINFO) {
GEarthExtension gEarthExtension = new GEarthExtension(
packet,
channel,
observer::onDisconnect
);
key.attach(gEarthExtension);
if (Authenticator.evaluate(gEarthExtension)) {
observer.onConnect(gEarthExtension);
} else {
gEarthExtension.closeConnection(); //you shall not pass...
}
} else if (key.attachment() != null)
((GEarthExtension)key.attachment()).act(packet);
}
}
} }
} catch (IOException e) {e.printStackTrace();} } catch (IOException e) {e.printStackTrace();}
}).start(); }).start();
} }
private void accept(SelectionKey key) {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
try {
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
ByteBuffer arr = ByteBuffer.wrap((new HPacket(Extensions.OUTGOING_MESSAGES_IDS.INFOREQUEST)).toBytes());
channel.write(arr);
} catch (IOException e) {
e.printStackTrace();
}
}
private boolean createServer(int port) { private boolean createServer(int port) {
try { try {
serverSocket = new ServerSocket(port); serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress(port));
selector = Selector.open();
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
return true; return true;
} catch (IOException e) { } catch (IOException e) {
return false; return false;
@ -42,7 +117,12 @@ public class GEarthExtensionsRegistrer {
} }
public int getPort() { public int getPort() {
return serverSocket.getLocalPort(); try {
return ((InetSocketAddress) serverSocket.getLocalAddress()).getPort();
} catch (IOException e) {
e.printStackTrace();
return 0;
}
} }
public interface ExtensionRegisterObserver { public interface ExtensionRegisterObserver {