Fixed issues with network extensions rewrite, see desc

- specified NioServerSocketChannel in ServerBootstrap
- moved TCP_NO_DELAY option to child channels in ServerBootstrap
- swapped outgoing/incoming packet structure registration (also added some documentation)
This commit is contained in:
dorving 2022-07-12 00:10:46 +02:00
parent 047d948086
commit c603cdbd61
3 changed files with 113 additions and 77 deletions

View File

@ -98,6 +98,8 @@ public final class NetworkExtensionClient extends GEarthExtension {
channel.close(); channel.close();
} catch (Exception e){ } catch (Exception e){
LOGGER.error("Failed to close client (channel={})", channel, e); LOGGER.error("Failed to close client (channel={})", channel, e);
} finally {
hasClosed();
} }
} }

View File

@ -103,7 +103,84 @@ public final class NetworkExtensionCodec {
} }
static { static {
// incoming registerOutgoingMessages();
registerIncomingMessages();
}
private static void registerIncomingMessages() {
register(Incoming.ExtensionInfo.HEADER_ID,
Incoming.ExtensionInfo.class,
(message, hPacket) -> {
hPacket.appendString(message.getTitle());
hPacket.appendString(message.getAuthor());
hPacket.appendString(message.getVersion());
hPacket.appendString(message.getDescription());
hPacket.appendBoolean(message.isOnClickUsed());
hPacket.appendBoolean(message.getFile() != null);
hPacket.appendString(Optional.ofNullable(message.getFile()).orElse(""));
hPacket.appendString(Optional.ofNullable(message.getCookie()).orElse(""));
hPacket.appendBoolean(message.isCanLeave());
hPacket.appendBoolean(message.isCanDelete());
},
(hPacket -> {
final String title = hPacket.readString();
final String author = hPacket.readString();
final String version = hPacket.readString();
final String description = hPacket.readString();
final boolean isOnClickUsed = hPacket.readBoolean();
final boolean hasFile = hPacket.readBoolean();
String file = hPacket.readString();
if (!hasFile)
file = null;
String cookie = hPacket.readString();
if (cookie.isEmpty())
cookie = null;
final boolean canLeave = hPacket.readBoolean();
final boolean canDelete = hPacket.readBoolean();
return new Incoming.ExtensionInfo(title, author, version, description, isOnClickUsed, file, cookie, canLeave, canDelete);
}));
register(Incoming.ManipulatedPacket.MANIPULATED_PACKET,
Incoming.ManipulatedPacket.class,
(message, hPacket) -> hPacket.appendLongString(message.gethMessage().stringify()),
(hPacket -> {
final String packetString = hPacket.readLongString(6);
final HMessage hMessage = new HMessage(packetString);
return new Incoming.ManipulatedPacket(hMessage);
}));
register(Incoming.SendMessage.HEADER_ID,
Incoming.SendMessage.class,
((message, hPacket) -> {
hPacket.appendByte((byte) (message.getDirection() == TOCLIENT ? 0 : 1));
hPacket.appendInt(message.getPacket().getBytesLength());
hPacket.appendBytes(message.getPacket().toBytes());
}),
(hPacket -> {
final byte side = hPacket.readByte();
final int length = hPacket.readInteger();
final byte[] data = hPacket.readBytes(length);
final HPacket packet = new HPacket(data);
return new Incoming.SendMessage(packet, side == 0 ? TOCLIENT : TOSERVER);
}));
register(Incoming.RequestFlags.HEADER_ID,
Incoming.RequestFlags.class,
(message, hPacket) -> {
},
(hPacket -> new Incoming.RequestFlags()));
register(Incoming.ExtensionConsoleLog.HEADER_ID,
Incoming.ExtensionConsoleLog.class,
(message, hPacket) -> hPacket.appendString(message.getContents()),
(hPacket -> new Incoming.ExtensionConsoleLog(hPacket.readString())));
register(Incoming.PacketToStringRequest.HEADER_ID,
Incoming.PacketToStringRequest.class,
(message, hPacket) -> hPacket.appendLongString(message.getString()),
(hPacket -> new Incoming.PacketToStringRequest(hPacket.readLongString())));
register(Incoming.StringToPacketRequest.HEADER_ID,
Incoming.StringToPacketRequest.class,
(message, hPacket) -> hPacket.appendLongString(message.getString(), StandardCharsets.UTF_8),
(hPacket -> new Incoming.StringToPacketRequest(hPacket.readLongString(StandardCharsets.UTF_8))));
}
private static void registerOutgoingMessages() {
register(Outgoing.InfoRequest.HEADER_ID, register(Outgoing.InfoRequest.HEADER_ID,
Outgoing.InfoRequest.class, Outgoing.InfoRequest.class,
(message, hPacket) -> { (message, hPacket) -> {
@ -178,88 +255,25 @@ public final class NetworkExtensionCodec {
(message, hPacket) -> hPacket.appendLongString(message.getString()), (message, hPacket) -> hPacket.appendLongString(message.getString()),
(hPacket -> new Outgoing.StringToPacketResponse(hPacket.readLongString())) (hPacket -> new Outgoing.StringToPacketResponse(hPacket.readLongString()))
); );
// outgoing
register(Incoming.ExtensionInfo.HEADER_ID,
Incoming.ExtensionInfo.class,
(message, hPacket) -> {
hPacket.appendString(message.getTitle());
hPacket.appendString(message.getAuthor());
hPacket.appendString(message.getVersion());
hPacket.appendString(message.getDescription());
hPacket.appendBoolean(message.isOnClickUsed());
hPacket.appendBoolean(message.getFile() != null);
hPacket.appendString(Optional.ofNullable(message.getFile()).orElse(""));
hPacket.appendString(Optional.ofNullable(message.getCookie()).orElse(""));
hPacket.appendBoolean(message.isCanLeave());
hPacket.appendBoolean(message.isCanDelete());
},
(hPacket -> {
final String title = hPacket.readString();
final String author = hPacket.readString();
final String version = hPacket.readString();
final String description = hPacket.readString();
final boolean isOnClickUsed = hPacket.readBoolean();
final boolean hasFile = hPacket.readBoolean();
String file = hPacket.readString();
if (!hasFile)
file = null;
String cookie = hPacket.readString();
if (cookie.isEmpty())
cookie = null;
final boolean canLeave = hPacket.readBoolean();
final boolean canDelete = hPacket.readBoolean();
return new Incoming.ExtensionInfo(title, author, version, description, isOnClickUsed, file, cookie, canLeave, canDelete);
}));
register(Incoming.ManipulatedPacket.MANIPULATED_PACKET,
Incoming.ManipulatedPacket.class,
(message, hPacket) -> hPacket.appendLongString(message.gethMessage().stringify()),
(hPacket -> {
final String packetString = hPacket.readLongString(6);
final HMessage hMessage = new HMessage(packetString);
return new Incoming.ManipulatedPacket(hMessage);
}));
register(Incoming.SendMessage.HEADER_ID,
Incoming.SendMessage.class,
((message, hPacket) -> {
hPacket.appendByte((byte) (message.getDirection() == TOCLIENT ? 0 : 1));
hPacket.appendInt(message.getPacket().getBytesLength());
hPacket.appendBytes(message.getPacket().toBytes());
}),
(hPacket -> {
final byte side = hPacket.readByte();
final int length = hPacket.readInteger();
final byte[] data = hPacket.readBytes(length);
final HPacket packet = new HPacket(data);
return new Incoming.SendMessage(packet, side == 0 ? TOCLIENT : TOSERVER);
}));
register(Incoming.RequestFlags.HEADER_ID,
Incoming.RequestFlags.class,
(message, hPacket) -> {
},
(hPacket -> new Incoming.RequestFlags()));
register(Incoming.ExtensionConsoleLog.HEADER_ID,
Incoming.ExtensionConsoleLog.class,
(message, hPacket) -> hPacket.appendString(message.getContents()),
(hPacket -> new Incoming.ExtensionConsoleLog(hPacket.readString())));
register(Incoming.PacketToStringRequest.HEADER_ID,
Incoming.PacketToStringRequest.class,
(message, hPacket) -> hPacket.appendLongString(message.getString()),
(hPacket -> new Incoming.PacketToStringRequest(hPacket.readLongString())));
register(Incoming.StringToPacketRequest.HEADER_ID,
Incoming.StringToPacketRequest.class,
(message, hPacket) -> hPacket.appendLongString(message.getString(), StandardCharsets.UTF_8),
(hPacket -> new Incoming.StringToPacketRequest(hPacket.readLongString(StandardCharsets.UTF_8))));
} }
private static <T extends NetworkExtensionMessage> void register(final int headerId, Class<T> tClass, BiConsumer<T, HPacket> writer, Function<HPacket, T> reader) { private static <T extends NetworkExtensionMessage> void register(final int headerId, Class<T> tClass, BiConsumer<T, HPacket> writer, Function<HPacket, T> reader) {
final PacketStructure packetStructure = new PacketStructure(headerId, tClass.getSimpleName(), writer, reader); final PacketStructure packetStructure = new PacketStructure(headerId, tClass.getSimpleName(), writer, reader);
if (tClass.getSuperclass() == Outgoing.class) if (tClass.getSuperclass() == Outgoing.class)
incomingPacketStructures.put(headerId, packetStructure);
else
outgoingPacketStructures.put(tClass, packetStructure); outgoingPacketStructures.put(tClass, packetStructure);
else
incomingPacketStructures.put(headerId, packetStructure);
} }
/**
* Represents the packet structure of a {@link NetworkExtensionMessage}.
*
* Can be used to {@link PacketStructure#writer write} messages to packets
* and {@link PacketStructure#reader read} messages from packets.
*
* @apiNote At the moment both outgoing and incoming messages have a reader and writer defined,
* this is so that in the future the same codec can be used for the Extensions API.
*/
static class PacketStructure { static class PacketStructure {
private final int headerId; private final int headerId;

View File

@ -10,6 +10,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.*; import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.Attribute; import io.netty.util.Attribute;
@ -21,6 +22,7 @@ import java.io.IOException;
import java.net.DatagramSocket; import java.net.DatagramSocket;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import static gearth.services.extension_handler.extensions.implementations.network.NetworkExtensionMessage.*; import static gearth.services.extension_handler.extensions.implementations.network.NetworkExtensionMessage.*;
@ -52,8 +54,9 @@ public final class NetworkExtensionServer implements ExtensionProducer {
public void startProducing(ExtensionProducerObserver observer) { public void startProducing(ExtensionProducerObserver observer) {
final ServerBootstrap bootstrap = new ServerBootstrap() final ServerBootstrap bootstrap = new ServerBootstrap()
.option(ChannelOption.TCP_NODELAY, true) .channel(NioServerSocketChannel.class)
.childHandler(new Initializer(observer)) .childHandler(new Initializer(observer))
.childOption(ChannelOption.TCP_NODELAY, true)
.group(new NioEventLoopGroup()); .group(new NioEventLoopGroup());
port = PORT_ONSET; port = PORT_ONSET;
@ -141,6 +144,10 @@ public final class NetworkExtensionServer implements ExtensionProducer {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
switch (stage) { switch (stage) {
case LENGTH: case LENGTH:
if (in.readableBytes() < HEADER_LENGTH)
return;
payloadLength = in.readInt(); payloadLength = in.readInt();
stage = Stage.PAYLOAD; stage = Stage.PAYLOAD;
break; break;
@ -219,6 +226,7 @@ public final class NetworkExtensionServer implements ExtensionProducer {
@Override @Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
LOGGER.trace("Channel unregistered (channel={})", ctx.channel()); LOGGER.trace("Channel unregistered (channel={})", ctx.channel());
close(ctx);
super.channelUnregistered(ctx); super.channelUnregistered(ctx);
} }
@ -251,7 +259,19 @@ public final class NetworkExtensionServer implements ExtensionProducer {
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOGGER.error("Channel exception caught (channel={}), closing channel", ctx.channel(), cause); LOGGER.error("Channel exception caught (channel={}), closing channel", ctx.channel(), cause);
close(ctx);
}
private void close(ChannelHandlerContext ctx) {
final Optional<NetworkExtensionClient> optionalClient = findClient(ctx);
if (optionalClient.isPresent())
optionalClient.get().close();
else
ctx.channel().close(); ctx.channel().close();
} }
private Optional<NetworkExtensionClient> findClient(ChannelHandlerContext ctx) {
return Optional.ofNullable(ctx.attr(CLIENT).get());
}
} }
} }