using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using GodotTools.IdeMessaging.Requests;
using GodotTools.IdeMessaging.Utils;
namespace GodotTools.IdeMessaging
{
public sealed class Peer : IDisposable
{
///
/// Major version.
/// There is no forward nor backward compatibility between different major versions.
/// Connection is refused if client and server have different major versions.
///
public static readonly int ProtocolVersionMajor = Assembly.GetAssembly(typeof(Peer)).GetName().Version.Major;
///
/// Minor version, which clients must be backward compatible with.
/// Connection is refused if the client's minor version is lower than the server's.
///
public static readonly int ProtocolVersionMinor = Assembly.GetAssembly(typeof(Peer)).GetName().Version.Minor;
///
/// Revision, which doesn't affect compatibility.
///
public static readonly int ProtocolVersionRevision = Assembly.GetAssembly(typeof(Peer)).GetName().Version.Revision;
public const string ClientHandshakeName = "GodotIdeClient";
public const string ServerHandshakeName = "GodotIdeServer";
private const int ClientWriteTimeout = 8000;
public delegate Task RequestHandler(Peer peer, MessageContent content);
private readonly TcpClient tcpClient;
private readonly TextReader clientReader;
private readonly TextWriter clientWriter;
private readonly SemaphoreSlim writeSem = new SemaphoreSlim(1);
private string remoteIdentity = string.Empty;
public string RemoteIdentity => remoteIdentity;
public event Action Connected;
public event Action Disconnected;
private ILogger Logger { get; }
public bool IsDisposed { get; private set; }
public bool IsTcpClientConnected => tcpClient.Client != null && tcpClient.Client.Connected;
private bool IsConnected { get; set; }
private readonly IHandshake handshake;
private readonly IMessageHandler messageHandler;
private readonly Dictionary> requestAwaiterQueues = new Dictionary>();
private readonly SemaphoreSlim requestsSem = new SemaphoreSlim(1);
public Peer(TcpClient tcpClient, IHandshake handshake, IMessageHandler messageHandler, ILogger logger)
{
this.tcpClient = tcpClient;
this.handshake = handshake;
this.messageHandler = messageHandler;
Logger = logger;
NetworkStream clientStream = tcpClient.GetStream();
clientStream.WriteTimeout = ClientWriteTimeout;
clientReader = new StreamReader(clientStream, Encoding.UTF8);
clientWriter = new StreamWriter(clientStream, Encoding.UTF8) { NewLine = "\n" };
}
public async Task Process()
{
try
{
var decoder = new MessageDecoder();
string messageLine;
while ((messageLine = await ReadLine()) != null)
{
var state = decoder.Decode(messageLine, out var msg);
if (state == MessageDecoder.State.Decoding)
continue; // Not finished decoding yet
if (state == MessageDecoder.State.Errored)
{
Logger.LogError($"Received message line with invalid format: {messageLine}");
continue;
}
Logger.LogDebug($"Received message: {msg}");
try
{
if (msg.Kind == MessageKind.Request)
{
var responseContent = await messageHandler.HandleRequest(this, msg.Id, msg.Content, Logger);
await WriteMessage(new Message(MessageKind.Response, msg.Id, responseContent));
}
else if (msg.Kind == MessageKind.Response)
{
ResponseAwaiter responseAwaiter;
using (await requestsSem.UseAsync())
{
if (!requestAwaiterQueues.TryGetValue(msg.Id, out var queue) || queue.Count <= 0)
{
Logger.LogError($"Received unexpected response: {msg.Id}");
return;
}
responseAwaiter = queue.Dequeue();
}
responseAwaiter.SetResult(msg.Content);
}
else
{
throw new IndexOutOfRangeException($"Invalid message kind {msg.Kind}");
}
}
catch (Exception e)
{
Logger.LogError($"Message handler for '{msg}' failed with exception", e);
}
}
}
catch (Exception e)
{
if (!IsDisposed || !(e is SocketException || e.InnerException is SocketException))
{
Logger.LogError("Unhandled exception in the peer loop", e);
}
}
}
public async Task DoHandshake(string identity)
{
if (!await WriteLine(handshake.GetHandshakeLine(identity)))
{
Logger.LogError("Could not write handshake");
return false;
}
var readHandshakeTask = ReadLine();
if (await Task.WhenAny(readHandshakeTask, Task.Delay(8000)) != readHandshakeTask)
{
Logger.LogError("Timeout waiting for the client handshake");
return false;
}
string peerHandshake = await readHandshakeTask;
if (handshake == null || !handshake.IsValidPeerHandshake(peerHandshake, out remoteIdentity, Logger))
{
Logger.LogError("Received invalid handshake: " + peerHandshake);
return false;
}
IsConnected = true;
Connected?.Invoke();
Logger.LogInfo("Peer connection started");
return true;
}
private async Task ReadLine()
{
try
{
return await clientReader.ReadLineAsync();
}
catch (Exception e)
{
if (IsDisposed)
{
var se = e as SocketException ?? e.InnerException as SocketException;
if (se != null && se.SocketErrorCode == SocketError.Interrupted)
return null;
}
throw;
}
}
private Task WriteMessage(Message message)
{
Logger.LogDebug($"Sending message: {message}");
int bodyLineCount = message.Content.Body.Count(c => c == '\n');
bodyLineCount += 1; // Extra line break at the end
var builder = new StringBuilder();
builder.AppendLine(message.Kind.ToString());
builder.AppendLine(message.Id);
builder.AppendLine(message.Content.Status.ToString());
builder.AppendLine(bodyLineCount.ToString());
builder.AppendLine(message.Content.Body);
return WriteLine(builder.ToString());
}
public async Task SendRequest(string id, string body)
where TResponse : Response, new()
{
ResponseAwaiter responseAwaiter;
using (await requestsSem.UseAsync())
{
bool written = await WriteMessage(new Message(MessageKind.Request, id, new MessageContent(body)));
if (!written)
return null;
if (!requestAwaiterQueues.TryGetValue(id, out var queue))
{
queue = new Queue();
requestAwaiterQueues.Add(id, queue);
}
responseAwaiter = new ResponseAwaiter();
queue.Enqueue(responseAwaiter);
}
return (TResponse)await responseAwaiter;
}
private async Task WriteLine(string text)
{
if (clientWriter == null || IsDisposed || !IsTcpClientConnected)
return false;
using (await writeSem.UseAsync())
{
try
{
await clientWriter.WriteLineAsync(text);
await clientWriter.FlushAsync();
}
catch (Exception e)
{
if (!IsDisposed)
{
var se = e as SocketException ?? e.InnerException as SocketException;
if (se != null && se.SocketErrorCode == SocketError.Shutdown)
Logger.LogInfo("Client disconnected ungracefully");
else
Logger.LogError("Exception thrown when trying to write to client", e);
Dispose();
}
}
}
return true;
}
// ReSharper disable once UnusedMember.Global
public void ShutdownSocketSend()
{
tcpClient.Client.Shutdown(SocketShutdown.Send);
}
public void Dispose()
{
if (IsDisposed)
return;
IsDisposed = true;
if (IsTcpClientConnected)
{
if (IsConnected)
Disconnected?.Invoke();
}
clientReader?.Dispose();
clientWriter?.Dispose();
((IDisposable)tcpClient)?.Dispose();
}
}
}