implement asynchrone extensionsupport

This commit is contained in:
sirjonasxx 2020-06-11 20:25:51 +02:00
parent 7f4336cc61
commit 62a5f696dd
7 changed files with 174 additions and 112 deletions

View File

@ -7,6 +7,7 @@ import gearth.protocol.connection.proxy.ProxyProvider;
import gearth.protocol.connection.proxy.ProxyProviderFactory;
import gearth.protocol.connection.proxy.unix.LinuxRawIpProxyProvider;
import gearth.protocol.connection.proxy.windows.WindowsRawIpProxyProvider;
import gearth.services.extensionhandler.ExtensionHandler;
import java.io.IOException;
@ -15,6 +16,8 @@ public class HConnection {
public static volatile boolean DECRYPTPACKETS = true;
public static volatile boolean DEBUG = false;
private volatile ExtensionHandler extensionHandler = null;
private volatile Object[] trafficObservables = {new Observable<TrafficListener>(), new Observable<TrafficListener>(), new Observable<TrafficListener>()};
private volatile Observable<StateChangeListener> stateObservable = new Observable<>();
@ -100,6 +103,14 @@ public class HConnection {
((Observable<TrafficListener>) trafficObservables[2]).removeListener(listener);
}
public void setExtensionHandler(ExtensionHandler handler) {
this.extensionHandler = handler;
}
public ExtensionHandler getExtensionHandler() {
return extensionHandler;
}
public Object[] getTrafficObservables() {
return trafficObservables;
}

View File

@ -44,8 +44,8 @@ public abstract class ProxyProvider {
if (HConnection.DEBUG) System.out.println(server.getLocalAddress().getHostAddress() + ": " + server.getLocalPort());
Rc4Obtainer rc4Obtainer = new Rc4Obtainer(hConnection);
OutgoingPacketHandler outgoingHandler = new OutgoingPacketHandler(server.getOutputStream(), hConnection.getTrafficObservables());
IncomingPacketHandler incomingHandler = new IncomingPacketHandler(client.getOutputStream(), hConnection.getTrafficObservables(), outgoingHandler);
OutgoingPacketHandler outgoingHandler = new OutgoingPacketHandler(server.getOutputStream(), hConnection.getTrafficObservables(), hConnection.getExtensionHandler());
IncomingPacketHandler incomingHandler = new IncomingPacketHandler(client.getOutputStream(), hConnection.getTrafficObservables(), outgoingHandler, hConnection.getExtensionHandler());
rc4Obtainer.setPacketHandlers(outgoingHandler, incomingHandler);
Semaphore abort = new Semaphore(0);

View File

@ -4,6 +4,7 @@ import gearth.misc.listenerpattern.Observable;
import gearth.protocol.HMessage;
import gearth.protocol.HPacket;
import gearth.protocol.TrafficListener;
import gearth.services.extensionhandler.ExtensionHandler;
import java.io.IOException;
import java.io.OutputStream;
@ -11,8 +12,8 @@ import java.util.List;
public class IncomingPacketHandler extends PacketHandler {
public IncomingPacketHandler(OutputStream outputStream, Object[] trafficObservables, OutgoingPacketHandler outgoingHandler) {
super(outputStream, trafficObservables);
public IncomingPacketHandler(OutputStream outputStream, Object[] trafficObservables, OutgoingPacketHandler outgoingHandler, ExtensionHandler extensionHandler) {
super(outputStream, trafficObservables, extensionHandler);
TrafficListener listener = new TrafficListener() {
@Override

View File

@ -3,6 +3,7 @@ package gearth.protocol.packethandler;
import gearth.misc.listenerpattern.Observable;
import gearth.protocol.HMessage;
import gearth.protocol.HPacket;
import gearth.services.extensionhandler.ExtensionHandler;
import java.io.IOException;
import java.io.OutputStream;
@ -12,8 +13,8 @@ import java.util.function.Consumer;
public class OutgoingPacketHandler extends PacketHandler {
public OutgoingPacketHandler(OutputStream outputStream, Object[] trafficObservables) {
super(outputStream, trafficObservables);
public OutgoingPacketHandler(OutputStream outputStream, Object[] trafficObservables, ExtensionHandler extensionHandler) {
super(outputStream, trafficObservables, extensionHandler);
}

View File

@ -6,6 +6,8 @@ import gearth.protocol.HMessage;
import gearth.protocol.HPacket;
import gearth.protocol.TrafficListener;
import gearth.protocol.crypto.RC4;
import gearth.services.extensionhandler.ExtensionHandler;
import gearth.services.extensionhandler.OnHMessageHandled;
import java.io.IOException;
import java.io.OutputStream;
@ -18,6 +20,7 @@ public abstract class PacketHandler {
private volatile PayloadBuffer payloadBuffer = new PayloadBuffer();
private volatile OutputStream out;
private volatile ExtensionHandler extensionHandler;
private volatile Object[] trafficObservables; //get notified on packet send
private volatile boolean isTempBlocked = false;
volatile boolean isDataStream = false;
@ -33,8 +36,9 @@ public abstract class PacketHandler {
volatile boolean isEncryptedStream = false;
PacketHandler(OutputStream outputStream, Object[] trafficObservables) {
PacketHandler(OutputStream outputStream, Object[] trafficObservables, ExtensionHandler extensionHandler) {
this.trafficObservables = trafficObservables;
this.extensionHandler = extensionHandler;
out = outputStream;
}
@ -115,13 +119,11 @@ public abstract class PacketHandler {
* LISTENERS CAN EDIT THE MESSAGE BEFORE BEING SENT
* @param message
*/
void notifyListeners(HMessage message) {
for (int x = 0; x < 3; x++) {
((Observable<TrafficListener>) trafficObservables[x]).fireEvent(trafficListener -> {
message.getPacket().resetReadIndex();
trafficListener.onCapture(message);
});
}
private void notifyListeners(int i, HMessage message) {
((Observable<TrafficListener>) trafficObservables[i]).fireEvent(trafficListener -> {
message.getPacket().resetReadIndex();
trafficListener.onCapture(message);
});
message.getPacket().resetReadIndex();
}
@ -146,19 +148,32 @@ public abstract class PacketHandler {
for (HPacket hpacket : hpackets){
HMessage hMessage = new HMessage(hpacket, getMessageSide(), currentIndex);
boolean isencrypted = isEncryptedStream;
OnHMessageHandled afterExtensionIntercept = hMessage1 -> {
if (isDataStream) {
notifyListeners(2, hMessage1);
}
if (!hMessage1.isBlocked()) {
synchronized (sendLock) {
out.write(
(!isencrypted)
? hMessage1.getPacket().toBytes()
: encryptcipher.rc4(hMessage1.getPacket().toBytes())
);
}
}
};
if (isDataStream) {
notifyListeners(hMessage);
notifyListeners(0, hMessage);
notifyListeners(1, hMessage);
extensionHandler.handle(hMessage, afterExtensionIntercept);
}
else {
afterExtensionIntercept.finished(hMessage);
}
if (!hMessage.isBlocked()) {
synchronized (sendLock) {
out.write(
(!isencrypted)
? hMessage.getPacket().toBytes()
: encryptcipher.rc4(hMessage.getPacket().toBytes())
);
}
}
currentIndex++;
}
}

View File

@ -8,13 +8,14 @@ import gearth.protocol.HMessage;
import gearth.protocol.HPacket;
import gearth.protocol.connection.HState;
import gearth.services.extensionhandler.extensions.ExtensionListener;
import gearth.services.extensionhandler.extensions.GEarthExtension;
import gearth.services.extensionhandler.extensions.extensionproducers.ExtensionProducer;
import gearth.services.extensionhandler.extensions.extensionproducers.ExtensionProducerFactory;
import gearth.services.extensionhandler.extensions.extensionproducers.ExtensionProducerObserver;
import gearth.services.extensionhandler.extensions.GEarthExtension;
import javafx.util.Pair;
import java.io.IOException;
import java.util.*;
import java.util.function.Consumer;
public class ExtensionHandler {
@ -31,9 +32,15 @@ public class ExtensionHandler {
}
};
private final Map<HMessage, Set<GEarthExtension>> awaitManipulation = new HashMap<>();
private final Map<HMessage, OnHMessageHandled> finishManipulationCallback = new HashMap<>();
private final Map<Pair<HMessage.Direction, Integer>, HMessage> originalMessages = new HashMap<>();
private final Map<HMessage, HMessage> editedMessages = new HashMap<>();
private final TreeSet<HMessage> allAwaitingMessages = new TreeSet<>(Comparator.comparingInt(HMessage::getIndex));
public ExtensionHandler(HConnection hConnection) {
this.hConnection = hConnection;
hConnection.setExtensionHandler(this);
initialize();
}
@ -54,7 +61,7 @@ public class ExtensionHandler {
}
}
if (oldState == HState.CONNECTED) {
synchronized (hConnection) {
synchronized (gEarthExtensions) {
for (GEarthExtension extension : gEarthExtensions) {
extension.connectionEnd();
}
@ -62,93 +69,105 @@ public class ExtensionHandler {
}
});
hConnection.addTrafficListener(1, message -> {
Set<GEarthExtension> collection;
synchronized (gEarthExtensions) {
collection = new HashSet<>(gEarthExtensions);
}
HMessage result = new HMessage(message);
boolean[] isblock = new boolean[1];
synchronized (collection) {
for (GEarthExtension extension : collection) {
ExtensionListener respondCallback = new ExtensionListener() {
@Override
public void manipulatedPacket(HMessage responseMessage) {
if (responseMessage.getDestination() == message.getDestination() && responseMessage.getIndex() == message.getIndex()) {
synchronized (result) {
if (!message.equals(responseMessage)) {
result.constructFromHMessage(responseMessage);
}
if (responseMessage.isBlocked()) {
isblock[0] = true;
}
synchronized (collection) {
collection.remove(extension);
}
synchronized (extension) {
extension.getExtensionObservable().removeListener(this);
}
}
}
}
};
synchronized (extension) {
extension.getExtensionObservable().addListener(respondCallback);
}
}
}
Set<GEarthExtension> collection2;
synchronized (collection) {
collection2 = new HashSet<>(collection);
}
synchronized (collection2) {
for (GEarthExtension extension : collection2) {
synchronized (extension) {
extension.packetIntercept(new HMessage(message));
}
}
}
//block untill all extensions have responded
List<GEarthExtension> willdelete = new ArrayList<>();
while (true) {
synchronized (collection) {
if (collection.isEmpty()) {
break;
}
synchronized (gEarthExtensions) {
for (GEarthExtension extension : collection) {
if (!gEarthExtensions.contains(extension)) willdelete.add(extension);
}
}
for (int i = willdelete.size() - 1; i >= 0; i--) {
collection.remove(willdelete.get(i));
willdelete.remove(i);
}
}
try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}
}
message.constructFromHMessage(result);
if (isblock[0]) {
message.setBlocked(true);
}
});
extensionProducers = ExtensionProducerFactory.getAll();
extensionProducers.forEach(this::initializeExtensionProducer);
}
private final Object hMessageStuffLock = new Object();
private void onExtensionRespond(GEarthExtension extension, HMessage edited) {
HMessage hMessage;
synchronized (hMessageStuffLock) {
Pair<HMessage.Direction, Integer> msgDirAndId = new Pair<>(edited.getDestination(), edited.getIndex());
hMessage = originalMessages.get(msgDirAndId);
if (awaitManipulation.containsKey(hMessage)) {
awaitManipulation.get(hMessage).remove(extension);
boolean wasBlocked = hMessage.isBlocked() ||
(editedMessages.get(hMessage) != null && editedMessages.get(hMessage).isBlocked());
if (!hMessage.equals(edited)) {
editedMessages.put(hMessage, edited);
if (wasBlocked) {
editedMessages.get(hMessage).setBlocked(true);
}
}
}
else {
hMessage = null;
}
}
if (hMessage != null) {
maybeFinishHmessage(hMessage);
}
}
private void onExtensionRemoved(GEarthExtension extension) {
List<HMessage> awaiting;
synchronized (hMessageStuffLock) {
awaiting = new ArrayList<>(allAwaitingMessages);
}
for (HMessage hMessage : awaiting) {
synchronized (hMessageStuffLock) {
awaitManipulation.get(hMessage).remove(extension);
}
maybeFinishHmessage(hMessage);
}
}
// argument is the original hmessage, not an edited one
private void maybeFinishHmessage(HMessage hMessage) {
OnHMessageHandled maybeCallback = null;
HMessage result = null;
synchronized (hMessageStuffLock) {
if (hMessage != null && awaitManipulation.containsKey(hMessage)) {
boolean isFinished = awaitManipulation.get(hMessage).isEmpty();
if (isFinished) {
awaitManipulation.remove(hMessage);
result = editedMessages.get(hMessage) == null ? hMessage : editedMessages.get(hMessage);
editedMessages.remove(hMessage);
originalMessages.remove(new Pair<>(result.getDestination(), result.getIndex()));
allAwaitingMessages.remove(hMessage);
maybeCallback = finishManipulationCallback.remove(hMessage);
}
}
}
if (maybeCallback != null) {
try {
maybeCallback.finished(result);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void handle(HMessage hMessage, OnHMessageHandled callback) {
synchronized (hMessageStuffLock) {
Pair<HMessage.Direction, Integer> msgDirectionAndId = new Pair<>(hMessage.getDestination(), hMessage.getIndex());
originalMessages.put(msgDirectionAndId, hMessage);
finishManipulationCallback.put(hMessage, callback);
editedMessages.put(hMessage, null);
allAwaitingMessages.add(hMessage);
synchronized (gEarthExtensions) {
awaitManipulation.put(hMessage, new HashSet<>(gEarthExtensions));
for (GEarthExtension extension : gEarthExtensions) {
extension.packetIntercept(hMessage);
}
}
}
maybeFinishHmessage(hMessage);
}
private void initializeExtensionProducer(ExtensionProducer producer) {
producer.startProducing(new ExtensionProducerObserver() {
@Override
@ -179,9 +198,15 @@ public class ExtensionHandler {
synchronized (gEarthExtensions) {
gEarthExtensions.remove(extension);
}
onExtensionRemoved(extension);
extension.getExtensionObservable().removeListener(this);
extension.getDeletedObservable().fireEvent();
}
@Override
protected void manipulatedPacket(HMessage hMessage) {
onExtensionRespond(extension, hMessage);
}
};
extension.getExtensionObservable().addListener(listener);
@ -204,11 +229,9 @@ public class ExtensionHandler {
});
}
public List<ExtensionProducer> getExtensionProducers() {
return extensionProducers;
}
public Observable<ExtensionConnectedListener> getObservable() {
return observable;
}

View File

@ -0,0 +1,11 @@
package gearth.services.extensionhandler;
import gearth.protocol.HMessage;
import java.io.IOException;
public interface OnHMessageHandled {
void finished(HMessage hMessage) throws IOException;
}