diff --git a/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/NitroConnectionState.java b/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/NitroConnectionState.java new file mode 100644 index 0000000..c5381f7 --- /dev/null +++ b/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/NitroConnectionState.java @@ -0,0 +1,66 @@ +package gearth.protocol.connection.proxy.nitro; + +import gearth.protocol.HMessage; +import gearth.protocol.connection.HState; +import gearth.protocol.connection.HStateSetter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NitroConnectionState { + + private static final Logger logger = LoggerFactory.getLogger(NitroConnectionState.class); + + private final HStateSetter stateSetter; + + private boolean aborting; + private boolean toServer; + private boolean toClient; + + public NitroConnectionState(HStateSetter stateSetter) { + this.stateSetter = stateSetter; + } + + public void setConnected(HMessage.Direction direction) { + if (direction == HMessage.Direction.TOCLIENT) { + this.toClient = true; + } else if (direction == HMessage.Direction.TOSERVER) { + this.toServer = true; + } + + this.checkConnected(); + } + + public boolean isConnected() { + if (this.aborting) { + return false; + } + + if (!this.toClient) { + return false; + } + + if (!this.toServer) { + return false; + } + + return true; + } + + private void checkConnected() { + if (!this.isConnected()) { + return; + } + + this.stateSetter.setState(HState.CONNECTED); + + logger.info("Connected"); + } + + public void setAborting() { + this.aborting = true; + this.stateSetter.setState(HState.ABORTING); + + logger.info("Aborting"); + } + +} diff --git a/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/NitroPacketQueue.java b/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/NitroPacketQueue.java new file mode 100644 index 0000000..9fc2869 --- /dev/null +++ b/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/NitroPacketQueue.java @@ -0,0 +1,28 @@ +package gearth.protocol.connection.proxy.nitro; + +import gearth.protocol.packethandler.nitro.NitroPacketHandler; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; + +public class NitroPacketQueue { + + private final NitroPacketHandler packetHandler; + private final Queue packets; + + public NitroPacketQueue(NitroPacketHandler packetHandler) { + this.packetHandler = packetHandler; + this.packets = new LinkedList<>(); + } + + public void enqueue(byte[] b) { + this.packets.add(b); + } + + public synchronized void flush() throws IOException { + while (!this.packets.isEmpty()) { + this.packetHandler.act(this.packets.remove()); + } + } +} diff --git a/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/websocket/NitroWebsocketClient.java b/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/websocket/NitroWebsocketClient.java index 7ebfc7d..8740fd9 100644 --- a/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/websocket/NitroWebsocketClient.java +++ b/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/websocket/NitroWebsocketClient.java @@ -2,8 +2,11 @@ package gearth.protocol.connection.proxy.nitro.websocket; import gearth.protocol.HConnection; import gearth.protocol.HMessage; +import gearth.protocol.StateChangeListener; import gearth.protocol.connection.*; +import gearth.protocol.connection.proxy.nitro.NitroConnectionState; import gearth.protocol.connection.proxy.nitro.NitroConstants; +import gearth.protocol.connection.proxy.nitro.NitroPacketQueue; import gearth.protocol.connection.proxy.nitro.NitroProxyProvider; import gearth.protocol.packethandler.nitro.NitroPacketHandler; import org.eclipse.jetty.websocket.jsr356.JsrSession; @@ -25,22 +28,24 @@ public class NitroWebsocketClient implements NitroSession { private static final Logger logger = LoggerFactory.getLogger(NitroWebsocketClient.class); private final HProxySetter proxySetter; - private final HStateSetter stateSetter; private final HConnection connection; + private final NitroConnectionState state; private final NitroProxyProvider proxyProvider; private final NitroWebsocketServer server; private final NitroPacketHandler packetHandler; + private final NitroPacketQueue packetQueue; private final AtomicBoolean shutdownLock; private JsrSession activeSession = null; public NitroWebsocketClient(HProxySetter proxySetter, HStateSetter stateSetter, HConnection connection, NitroProxyProvider proxyProvider) { this.proxySetter = proxySetter; - this.stateSetter = stateSetter; this.connection = connection; + this.state = new NitroConnectionState(stateSetter); this.proxyProvider = proxyProvider; - this.server = new NitroWebsocketServer(connection, this); + this.server = new NitroWebsocketServer(connection, this, this.state); this.packetHandler = new NitroPacketHandler(HMessage.Direction.TOSERVER, server, connection.getExtensionHandler(), connection.getTrafficObservables()); + this.packetQueue = new NitroPacketQueue(this.packetHandler); this.shutdownLock = new AtomicBoolean(); } @@ -48,6 +53,24 @@ public class NitroWebsocketClient implements NitroSession { public void onOpen(Session session) throws Exception { logger.info("WebSocket connection accepted"); + // Setup state change listener + connection.getStateObservable().addListener(new StateChangeListener() { + @Override + public void stateChanged(HState oldState, HState newState) { + // Clean up when we don't need it anymore. + if ((oldState == HState.WAITING_FOR_CLIENT || newState == HState.NOT_CONNECTED) || newState == HState.ABORTING) { + connection.getStateObservable().removeListener(this); + } + + // Process queue when connected. + try { + packetQueue.flush(); + } catch (IOException e) { + logger.error("Failed to flush packet queue in state change listener", e); + } + } + }); + activeSession = (JsrSession) session; activeSession.setMaxBinaryMessageBufferSize(NitroConstants.WEBSOCKET_BUFFER_SIZE); activeSession.setMaxTextMessageBufferSize(NitroConstants.WEBSOCKET_BUFFER_SIZE); @@ -72,12 +95,21 @@ public class NitroWebsocketClient implements NitroSession { ); proxySetter.setProxy(proxy); - stateSetter.setState(HState.CONNECTED); + state.setConnected(HMessage.Direction.TOSERVER); } @OnMessage public void onMessage(byte[] b, Session session) throws IOException { - packetHandler.act(b); + logger.debug("Received packet from browser"); + + // Enqueue all packets we receive to ensure we preserve correct packet order. + packetQueue.enqueue(b); + + // Flush everything if we are connected. + // We also flush when connection state changes to connected. + if (state.isConnected()) { + packetQueue.flush(); + } } @OnClose @@ -133,7 +165,7 @@ public class NitroWebsocketClient implements NitroSession { // Reset program state. proxySetter.setProxy(null); - stateSetter.setState(HState.ABORTING); + state.setAborting(); } } } diff --git a/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/websocket/NitroWebsocketServer.java b/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/websocket/NitroWebsocketServer.java index 37ca63d..1177fa8 100644 --- a/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/websocket/NitroWebsocketServer.java +++ b/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/websocket/NitroWebsocketServer.java @@ -2,10 +2,13 @@ package gearth.protocol.connection.proxy.nitro.websocket; import gearth.protocol.HConnection; import gearth.protocol.HMessage; +import gearth.protocol.connection.proxy.nitro.NitroConnectionState; import gearth.protocol.connection.proxy.nitro.NitroConstants; import gearth.protocol.packethandler.PacketHandler; import gearth.protocol.packethandler.nitro.NitroPacketHandler; import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.WebSocketListener; @@ -36,15 +39,21 @@ public class NitroWebsocketServer implements WebSocketListener, NitroSession { "Sec-WebSocket-Version", "Host", "Connection", - "Upgrade" + "Upgrade", + "User-Agent", // Added by default + "Accept-Encoding", // Added by default + "Cache-Control", // Added by default + "Pragma" // Added by default )); private final PacketHandler packetHandler; private final NitroWebsocketClient client; + private final NitroConnectionState state; private Session activeSession = null; - public NitroWebsocketServer(HConnection connection, NitroWebsocketClient client) { + public NitroWebsocketServer(HConnection connection, NitroWebsocketClient client, NitroConnectionState state) { this.client = client; + this.state = state; this.packetHandler = new NitroPacketHandler(HMessage.Direction.TOCLIENT, client, connection.getExtensionHandler(), connection.getTrafficObservables()); } @@ -56,6 +65,8 @@ public class NitroWebsocketServer implements WebSocketListener, NitroSession { final ClientUpgradeRequest request = new ClientUpgradeRequest(); + request.addExtensions("permessage-deflate"); + clientHeaders.forEach((key, value) -> { if (SKIP_HEADERS.contains(key)) { return; @@ -64,12 +75,17 @@ public class NitroWebsocketServer implements WebSocketListener, NitroSession { request.setHeader(key, value); }); + if (clientHeaders.containsKey("User-Agent")) { + final String realUserAgent = clientHeaders.get(HttpHeader.USER_AGENT.toString()).get(0); + final HttpField clientUserAgent = new HttpField(HttpHeader.USER_AGENT, realUserAgent); + + client.getHttpClient().setUserAgentField(clientUserAgent); + } + logger.info("Connecting to origin websocket at {}", websocketUrl); client.start(); client.connect(this, URI.create(websocketUrl), request); - - logger.info("Connected to origin websocket"); } catch (Exception e) { throw new IOException("Failed to start websocket client to origin " + websocketUrl, e); } @@ -124,6 +140,7 @@ public class NitroWebsocketServer implements WebSocketListener, NitroSession { @Override public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session session) { activeSession = session; + state.setConnected(HMessage.Direction.TOCLIENT); } @Override diff --git a/G-Earth/src/main/java/gearth/protocol/packethandler/nitro/NitroPacketHandler.java b/G-Earth/src/main/java/gearth/protocol/packethandler/nitro/NitroPacketHandler.java index fc72f34..853127b 100644 --- a/G-Earth/src/main/java/gearth/protocol/packethandler/nitro/NitroPacketHandler.java +++ b/G-Earth/src/main/java/gearth/protocol/packethandler/nitro/NitroPacketHandler.java @@ -35,6 +35,7 @@ public class NitroPacketHandler extends PacketHandler { final Session localSession = session.getSession(); if (localSession == null) { + logger.warn("Discarding {} bytes because the session for direction {} was null", buffer.length, this.direction); return false; }