Fix discarded packets

This commit is contained in:
UnfamiliarLegacy 2023-06-07 19:59:55 +02:00
parent c7037c0441
commit 1579bed1ef
3 changed files with 71 additions and 4 deletions

View File

@ -30,16 +30,24 @@ public class NitroConnectionState {
this.checkConnected(); this.checkConnected();
} }
public void checkConnected() { public boolean isConnected() {
if (this.aborting) { if (this.aborting) {
return; return false;
} }
if (!this.toClient) { if (!this.toClient) {
return; return false;
} }
if (!this.toServer) { if (!this.toServer) {
return false;
}
return true;
}
private void checkConnected() {
if (!this.isConnected()) {
return; return;
} }

View File

@ -0,0 +1,28 @@
package gearth.protocol.connection.proxy.nitro;
import gearth.protocol.packethandler.nitro.NitroPacketHandler;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
public class NitroPacketQueue {
private final NitroPacketHandler packetHandler;
private final Queue<byte[]> packets;
public NitroPacketQueue(NitroPacketHandler packetHandler) {
this.packetHandler = packetHandler;
this.packets = new LinkedList<>();
}
public void enqueue(byte[] b) {
this.packets.add(b);
}
public synchronized void flush() throws IOException {
while (!this.packets.isEmpty()) {
this.packetHandler.act(this.packets.remove());
}
}
}

View File

@ -2,9 +2,11 @@ package gearth.protocol.connection.proxy.nitro.websocket;
import gearth.protocol.HConnection; import gearth.protocol.HConnection;
import gearth.protocol.HMessage; import gearth.protocol.HMessage;
import gearth.protocol.StateChangeListener;
import gearth.protocol.connection.*; import gearth.protocol.connection.*;
import gearth.protocol.connection.proxy.nitro.NitroConnectionState; import gearth.protocol.connection.proxy.nitro.NitroConnectionState;
import gearth.protocol.connection.proxy.nitro.NitroConstants; import gearth.protocol.connection.proxy.nitro.NitroConstants;
import gearth.protocol.connection.proxy.nitro.NitroPacketQueue;
import gearth.protocol.connection.proxy.nitro.NitroProxyProvider; import gearth.protocol.connection.proxy.nitro.NitroProxyProvider;
import gearth.protocol.packethandler.nitro.NitroPacketHandler; import gearth.protocol.packethandler.nitro.NitroPacketHandler;
import org.eclipse.jetty.websocket.jsr356.JsrSession; import org.eclipse.jetty.websocket.jsr356.JsrSession;
@ -26,20 +28,24 @@ public class NitroWebsocketClient implements NitroSession {
private static final Logger logger = LoggerFactory.getLogger(NitroWebsocketClient.class); private static final Logger logger = LoggerFactory.getLogger(NitroWebsocketClient.class);
private final HProxySetter proxySetter; private final HProxySetter proxySetter;
private final HConnection connection;
private final NitroConnectionState state; private final NitroConnectionState state;
private final NitroProxyProvider proxyProvider; private final NitroProxyProvider proxyProvider;
private final NitroWebsocketServer server; private final NitroWebsocketServer server;
private final NitroPacketHandler packetHandler; private final NitroPacketHandler packetHandler;
private final NitroPacketQueue packetQueue;
private final AtomicBoolean shutdownLock; private final AtomicBoolean shutdownLock;
private JsrSession activeSession = null; private JsrSession activeSession = null;
public NitroWebsocketClient(HProxySetter proxySetter, HStateSetter stateSetter, HConnection connection, NitroProxyProvider proxyProvider) { public NitroWebsocketClient(HProxySetter proxySetter, HStateSetter stateSetter, HConnection connection, NitroProxyProvider proxyProvider) {
this.proxySetter = proxySetter; this.proxySetter = proxySetter;
this.connection = connection;
this.state = new NitroConnectionState(stateSetter); this.state = new NitroConnectionState(stateSetter);
this.proxyProvider = proxyProvider; this.proxyProvider = proxyProvider;
this.server = new NitroWebsocketServer(connection, this, this.state); this.server = new NitroWebsocketServer(connection, this, this.state);
this.packetHandler = new NitroPacketHandler(HMessage.Direction.TOSERVER, server, connection.getExtensionHandler(), connection.getTrafficObservables()); this.packetHandler = new NitroPacketHandler(HMessage.Direction.TOSERVER, server, connection.getExtensionHandler(), connection.getTrafficObservables());
this.packetQueue = new NitroPacketQueue(this.packetHandler);
this.shutdownLock = new AtomicBoolean(); this.shutdownLock = new AtomicBoolean();
} }
@ -47,6 +53,24 @@ public class NitroWebsocketClient implements NitroSession {
public void onOpen(Session session) throws Exception { public void onOpen(Session session) throws Exception {
logger.info("WebSocket connection accepted"); logger.info("WebSocket connection accepted");
// Setup state change listener
connection.getStateObservable().addListener(new StateChangeListener() {
@Override
public void stateChanged(HState oldState, HState newState) {
// Clean up when we don't need it anymore.
if ((oldState == HState.WAITING_FOR_CLIENT || newState == HState.NOT_CONNECTED) || newState == HState.ABORTING) {
connection.getStateObservable().removeListener(this);
}
// Process queue when connected.
try {
packetQueue.flush();
} catch (IOException e) {
logger.error("Failed to flush packet queue in state change listener", e);
}
}
});
activeSession = (JsrSession) session; activeSession = (JsrSession) session;
activeSession.setMaxBinaryMessageBufferSize(NitroConstants.WEBSOCKET_BUFFER_SIZE); activeSession.setMaxBinaryMessageBufferSize(NitroConstants.WEBSOCKET_BUFFER_SIZE);
activeSession.setMaxTextMessageBufferSize(NitroConstants.WEBSOCKET_BUFFER_SIZE); activeSession.setMaxTextMessageBufferSize(NitroConstants.WEBSOCKET_BUFFER_SIZE);
@ -78,7 +102,14 @@ public class NitroWebsocketClient implements NitroSession {
public void onMessage(byte[] b, Session session) throws IOException { public void onMessage(byte[] b, Session session) throws IOException {
logger.debug("Received packet from browser"); logger.debug("Received packet from browser");
packetHandler.act(b); // Enqueue all packets we receive to ensure we preserve correct packet order.
packetQueue.enqueue(b);
// Flush everything if we are connected.
// We also flush when connection state changes to connected.
if (state.isConnected()) {
packetQueue.flush();
}
} }
@OnClose @OnClose