From 1579bed1ef9dfab3d692644ca6efa96285d521f6 Mon Sep 17 00:00:00 2001 From: UnfamiliarLegacy <74633542+UnfamiliarLegacy@users.noreply.github.com> Date: Wed, 7 Jun 2023 19:59:55 +0200 Subject: [PATCH] Fix discarded packets --- .../proxy/nitro/NitroConnectionState.java | 14 ++++++-- .../proxy/nitro/NitroPacketQueue.java | 28 ++++++++++++++++ .../nitro/websocket/NitroWebsocketClient.java | 33 ++++++++++++++++++- 3 files changed, 71 insertions(+), 4 deletions(-) create mode 100644 G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/NitroPacketQueue.java diff --git a/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/NitroConnectionState.java b/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/NitroConnectionState.java index 42c7b20..c5381f7 100644 --- a/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/NitroConnectionState.java +++ b/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/NitroConnectionState.java @@ -30,16 +30,24 @@ public class NitroConnectionState { this.checkConnected(); } - public void checkConnected() { + public boolean isConnected() { if (this.aborting) { - return; + return false; } if (!this.toClient) { - return; + return false; } if (!this.toServer) { + return false; + } + + return true; + } + + private void checkConnected() { + if (!this.isConnected()) { return; } diff --git a/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/NitroPacketQueue.java b/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/NitroPacketQueue.java new file mode 100644 index 0000000..9fc2869 --- /dev/null +++ b/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/NitroPacketQueue.java @@ -0,0 +1,28 @@ +package gearth.protocol.connection.proxy.nitro; + +import gearth.protocol.packethandler.nitro.NitroPacketHandler; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; + +public class NitroPacketQueue { + + private final NitroPacketHandler packetHandler; + private final Queue packets; + + public NitroPacketQueue(NitroPacketHandler packetHandler) { + this.packetHandler = packetHandler; + this.packets = new LinkedList<>(); + } + + public void enqueue(byte[] b) { + this.packets.add(b); + } + + public synchronized void flush() throws IOException { + while (!this.packets.isEmpty()) { + this.packetHandler.act(this.packets.remove()); + } + } +} diff --git a/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/websocket/NitroWebsocketClient.java b/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/websocket/NitroWebsocketClient.java index 1c0cc78..8740fd9 100644 --- a/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/websocket/NitroWebsocketClient.java +++ b/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/websocket/NitroWebsocketClient.java @@ -2,9 +2,11 @@ package gearth.protocol.connection.proxy.nitro.websocket; import gearth.protocol.HConnection; import gearth.protocol.HMessage; +import gearth.protocol.StateChangeListener; import gearth.protocol.connection.*; import gearth.protocol.connection.proxy.nitro.NitroConnectionState; import gearth.protocol.connection.proxy.nitro.NitroConstants; +import gearth.protocol.connection.proxy.nitro.NitroPacketQueue; import gearth.protocol.connection.proxy.nitro.NitroProxyProvider; import gearth.protocol.packethandler.nitro.NitroPacketHandler; import org.eclipse.jetty.websocket.jsr356.JsrSession; @@ -26,20 +28,24 @@ public class NitroWebsocketClient implements NitroSession { private static final Logger logger = LoggerFactory.getLogger(NitroWebsocketClient.class); private final HProxySetter proxySetter; + private final HConnection connection; private final NitroConnectionState state; private final NitroProxyProvider proxyProvider; private final NitroWebsocketServer server; private final NitroPacketHandler packetHandler; + private final NitroPacketQueue packetQueue; private final AtomicBoolean shutdownLock; private JsrSession activeSession = null; public NitroWebsocketClient(HProxySetter proxySetter, HStateSetter stateSetter, HConnection connection, NitroProxyProvider proxyProvider) { this.proxySetter = proxySetter; + this.connection = connection; this.state = new NitroConnectionState(stateSetter); this.proxyProvider = proxyProvider; this.server = new NitroWebsocketServer(connection, this, this.state); this.packetHandler = new NitroPacketHandler(HMessage.Direction.TOSERVER, server, connection.getExtensionHandler(), connection.getTrafficObservables()); + this.packetQueue = new NitroPacketQueue(this.packetHandler); this.shutdownLock = new AtomicBoolean(); } @@ -47,6 +53,24 @@ public class NitroWebsocketClient implements NitroSession { public void onOpen(Session session) throws Exception { logger.info("WebSocket connection accepted"); + // Setup state change listener + connection.getStateObservable().addListener(new StateChangeListener() { + @Override + public void stateChanged(HState oldState, HState newState) { + // Clean up when we don't need it anymore. + if ((oldState == HState.WAITING_FOR_CLIENT || newState == HState.NOT_CONNECTED) || newState == HState.ABORTING) { + connection.getStateObservable().removeListener(this); + } + + // Process queue when connected. + try { + packetQueue.flush(); + } catch (IOException e) { + logger.error("Failed to flush packet queue in state change listener", e); + } + } + }); + activeSession = (JsrSession) session; activeSession.setMaxBinaryMessageBufferSize(NitroConstants.WEBSOCKET_BUFFER_SIZE); activeSession.setMaxTextMessageBufferSize(NitroConstants.WEBSOCKET_BUFFER_SIZE); @@ -78,7 +102,14 @@ public class NitroWebsocketClient implements NitroSession { public void onMessage(byte[] b, Session session) throws IOException { logger.debug("Received packet from browser"); - packetHandler.act(b); + // Enqueue all packets we receive to ensure we preserve correct packet order. + packetQueue.enqueue(b); + + // Flush everything if we are connected. + // We also flush when connection state changes to connected. + if (state.isConnected()) { + packetQueue.flush(); + } } @OnClose