< Summary - Combined Code Coverage

Information
Class: NLightning.Infrastructure.Transport.Services.TransportService
Assembly: NLightning.Infrastructure
File(s): /home/runner/work/nlightning/nlightning/src/NLightning.Infrastructure/Transport/Services/TransportService.cs
Tag: 38_17925369700
Line coverage
36%
Covered lines: 69
Uncovered lines: 118
Coverable lines: 187
Total lines: 364
Line coverage: 36.8%
Branch coverage
21%
Covered branches: 18
Total branches: 82
Branch coverage: 21.9%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%1.04165%
get_IsInitiator()100%210%
get_IsConnected()100%210%
get_RemoteStaticPublicKey()100%11100%
.ctor(...)100%210%
InitializeAsync()32.14%73.222252.7%
WriteMessageAsync()0%4260%
IsSocketConnected()50%12.17644.44%
ReadResponseAsync()11.76%746.693414.89%
Dispose()100%210%
Dispose(...)25%21.82840%
Finalize()100%11100%

File(s)

/home/runner/work/nlightning/nlightning/src/NLightning.Infrastructure/Transport/Services/TransportService.cs

#LineLine coverage
 1using System.Buffers;
 2using System.Net.Sockets;
 3using Microsoft.Extensions.Logging;
 4
 5namespace NLightning.Infrastructure.Transport.Services;
 6
 7using Crypto.Interfaces;
 8using Domain.Crypto.ValueObjects;
 9using Domain.Exceptions;
 10using Domain.Protocol.Interfaces;
 11using Domain.Serialization.Interfaces;
 12using Domain.Transport;
 13using Exceptions;
 14using Interfaces;
 15using Protocol.Constants;
 16
 17internal sealed class TransportService : ITransportService
 18{
 1219    private readonly CancellationTokenSource _cts = new();
 20    private readonly ILogger _logger;
 21    private readonly IMessageSerializer _messageSerializer;
 22    private readonly TimeSpan _networkTimeout;
 1223    private readonly SemaphoreSlim _networkWriteSemaphore = new(1, 1);
 24    private readonly TcpClient _tcpClient;
 1225    private readonly TaskCompletionSource<bool> _tcs = new();
 26
 27    private IHandshakeService? _handshakeService;
 28    private ITransport? _transport;
 29    private bool _disposed;
 30
 31    // event that will be called when a message is received
 32    public event EventHandler<MemoryStream>? MessageReceived;
 33    public event EventHandler<Exception>? ExceptionRaised;
 34
 035    public bool IsInitiator { get; }
 036    public bool IsConnected => _tcpClient.Connected;
 837    public CompactPubKey? RemoteStaticPublicKey { get; private set; }
 38
 39    public TransportService(IEcdh ecdh, ILogger logger, IMessageSerializer messageSerializer, TimeSpan networkTimeout,
 40                            bool isInitiator, ReadOnlySpan<byte> s, ReadOnlySpan<byte> rs, TcpClient tcpClient)
 041        : this(logger, messageSerializer, networkTimeout, new HandshakeService(isInitiator, s, rs, null, ecdh),
 042               tcpClient)
 43    {
 044        _messageSerializer = messageSerializer;
 045        _networkTimeout = networkTimeout;
 046    }
 47
 1248    internal TransportService(ILogger logger, IMessageSerializer messageSerializer, TimeSpan networkTimeout,
 1249                              IHandshakeService handshakeService, TcpClient tcpClient)
 50    {
 1251        _handshakeService = handshakeService;
 1252        _logger = logger;
 1253        _messageSerializer = messageSerializer;
 1254        _networkTimeout = networkTimeout;
 1255        _tcpClient = tcpClient;
 56
 1257        IsInitiator = handshakeService.IsInitiator;
 1258    }
 59
 60    public async Task InitializeAsync()
 61    {
 1262        if (_handshakeService == null)
 063            throw new NullReferenceException(nameof(_handshakeService));
 64
 1265        if (!_tcpClient.Connected)
 466            throw new InvalidOperationException("TcpClient is not connected");
 67
 868        var writeBuffer = ArrayPool<byte>.Shared.Rent(66);
 869        var readBuffer = ArrayPool<byte>.Shared.Rent(50);
 870        var stream = _tcpClient.GetStream();
 71
 872        CancellationTokenSource networkTimeoutCancellationTokenSource = new();
 73
 874        var host = _tcpClient.Client.RemoteEndPoint?.ToString() ?? "unknown";
 75
 876        if (_handshakeService.IsInitiator)
 77        {
 78            try
 79            {
 880                _logger.LogTrace("We're initiator, writing Act One for {host}", host);
 81
 82                // Write Act One
 883                var len = _handshakeService.PerformStep(ProtocolConstants.EmptyMessage, writeBuffer.AsSpan()[..50],
 884                                                        out _);
 885                await stream.WriteAsync(writeBuffer.AsMemory()[..len], networkTimeoutCancellationTokenSource.Token);
 886                await stream.FlushAsync(networkTimeoutCancellationTokenSource.Token);
 87
 88                // Read exactly 50 bytes
 889                _logger.LogTrace("Reading Act Two from {host}", host);
 890                networkTimeoutCancellationTokenSource = new CancellationTokenSource(_networkTimeout);
 891                await stream.ReadExactlyAsync(readBuffer.AsMemory()[..50], networkTimeoutCancellationTokenSource.Token);
 492                networkTimeoutCancellationTokenSource.Dispose();
 93
 94                // Read Act Two and Write Act Three
 495                _logger.LogTrace("Writing Act Three for {host}", host);
 496                len = _handshakeService.PerformStep(readBuffer.AsSpan()[..50], writeBuffer.AsSpan()[..66],
 497                                                    out _transport);
 498                await stream.WriteAsync(writeBuffer.AsMemory()[..len], CancellationToken.None);
 499                await stream.FlushAsync(CancellationToken.None);
 4100            }
 4101            catch (Exception e)
 102            {
 4103                throw new ConnectionTimeoutException($"Timeout while reading Handshake's Act 2 from host {host}", e);
 104            }
 105            finally
 106            {
 8107                ArrayPool<byte>.Shared.Return(writeBuffer);
 8108                ArrayPool<byte>.Shared.Return(readBuffer);
 109            }
 110        }
 111        else
 112        {
 0113            var act = 1;
 0114            _logger.LogTrace("We're responder, waiting for Act One from {host}", host);
 115
 116            try
 117            {
 118                // Read exactly 50 bytes
 0119                networkTimeoutCancellationTokenSource = new CancellationTokenSource(_networkTimeout);
 0120                if (!IsSocketConnected())
 0121                    throw new ConnectionException($"TcpClient disconnected before reading Act One from host {host}");
 122
 0123                await stream.ReadExactlyAsync(readBuffer.AsMemory()[..50], networkTimeoutCancellationTokenSource.Token);
 124
 125                // Read Act One and Write Act Two
 0126                _logger.LogTrace("Writing Act Two for {host}", host);
 0127                var len = _handshakeService.PerformStep(readBuffer.AsSpan()[..50], writeBuffer.AsSpan()[..50], out _);
 128
 0129                if (!IsSocketConnected())
 0130                    throw new ConnectionException($"TcpClient disconnected before writing Act Two to host {host}");
 131
 0132                await stream.WriteAsync(writeBuffer.AsMemory()[..len], CancellationToken.None);
 0133                await stream.FlushAsync(CancellationToken.None);
 134
 135                // Read exactly 66 bytes
 0136                _logger.LogTrace("Reading Act Three from {host}", host);
 0137                act = 3;
 0138                networkTimeoutCancellationTokenSource = new CancellationTokenSource(_networkTimeout);
 0139                if (!IsSocketConnected())
 0140                    throw new ConnectionException($"TcpClient disconnected before reading Act Three from host {host}");
 0141                await stream.ReadExactlyAsync(readBuffer.AsMemory()[..66], networkTimeoutCancellationTokenSource.Token);
 0142                networkTimeoutCancellationTokenSource.Dispose();
 143
 144                // Read Act Three
 0145                _ = _handshakeService.PerformStep(readBuffer.AsSpan()[..66], writeBuffer.AsSpan()[..50],
 0146                                                  out _transport);
 0147            }
 0148            catch (TaskCanceledException tce)
 149            {
 0150                _tcs.SetResult(true);
 0151                throw new ConnectionTimeoutException(
 0152                    $"Timeout while reading Handshake's Act {act} from host {host}", tce);
 153            }
 0154            catch (Exception e)
 155            {
 0156                _tcs.SetResult(true);
 0157                throw new ConnectionException($"Host {host} closed the connection", e);
 158            }
 159            finally
 160            {
 0161                ArrayPool<byte>.Shared.Return(writeBuffer);
 0162                ArrayPool<byte>.Shared.Return(readBuffer);
 163            }
 164        }
 165
 166        // Handshake completed
 4167        if (_transport is null)
 0168            throw new InvalidOperationException($"Handshake not completed for host {host}");
 169
 4170        RemoteStaticPublicKey = _handshakeService.RemoteStaticPublicKey
 4171                             ?? throw new InvalidOperationException($"RemoteStaticPublicKey is null for host {host}");
 172
 173        // Listen to messages and raise event
 4174        _logger.LogTrace("Handshake completed with host {host}, listening to messages from peer {peer}", host,
 4175                         RemoteStaticPublicKey);
 4176        _ = Task.Run(ReadResponseAsync, _cts.Token).ContinueWith(task =>
 4177        {
 0178            if (task.Exception?.InnerExceptions.Count > 0)
 0179                ExceptionRaised?.Invoke(this, task.Exception.InnerExceptions[0]);
 4180        }, _cts.Token);
 181
 182        // Dispose of the handshake service
 4183        _handshakeService.Dispose();
 4184        _handshakeService = null;
 4185    }
 186
 187    public async Task WriteMessageAsync(IMessage message, CancellationToken cancellationToken = default)
 188    {
 0189        if (_tcpClient is null)
 0190            throw new ConnectionException("TcpClient was null while trying to write a message",
 0191                                          new NullReferenceException(nameof(_tcpClient)));
 192
 0193        if (!IsSocketConnected())
 0194            throw new ConnectionException("TcpClient was not connected while trying to write a message");
 195
 0196        if (_transport is null)
 0197            throw new ConnectionException("Handshake not completed while trying to write a message");
 198
 199        // Serialize the message
 0200        using var messageStream = new MemoryStream();
 0201        await _messageSerializer.SerializeAsync(message, messageStream);
 202
 203        // Encrypt message
 0204        var buffer = ArrayPool<byte>.Shared.Rent(ProtocolConstants.MaxMessageLength);
 0205        var size = _transport.WriteMessage(messageStream.ToArray(),
 0206                                           buffer.AsSpan()[..ProtocolConstants.MaxMessageLength]);
 207
 208        // Write the message to stream
 0209        await _networkWriteSemaphore.WaitAsync(cancellationToken);
 210        try
 211        {
 0212            var stream = _tcpClient.GetStream();
 0213            await stream.WriteAsync(buffer.AsMemory()[..size], cancellationToken);
 0214            await stream.FlushAsync(cancellationToken);
 0215        }
 0216        catch (Exception e)
 217        {
 0218            throw new ConnectionException("Error writing message", e);
 219        }
 220        finally
 221        {
 0222            ArrayPool<byte>.Shared.Return(buffer);
 0223            _networkWriteSemaphore.Release();
 224        }
 0225    }
 226
 227    private bool IsSocketConnected()
 228    {
 229        try
 230        {
 4231            if (_tcpClient.Client.Connected)
 232            {
 233                // This is how you can determine if a socket is still connected.
 4234                return _tcpClient.Client.Connected &&
 4235                       (!_tcpClient.Client.Poll(1, SelectMode.SelectRead) || _tcpClient.Client.Available != 0);
 236            }
 237
 0238            return false;
 239        }
 0240        catch (SocketException)
 241        {
 0242            return false;
 243        }
 0244        catch (ObjectDisposedException)
 245        {
 0246            return false;
 247        }
 4248    }
 249
 250    private async Task ReadResponseAsync()
 251    {
 4252        while (!_cts.IsCancellationRequested)
 253        {
 4254            var buffer = ArrayPool<byte>.Shared.Rent(ProtocolConstants.MaxMessageLength);
 4255            var memoryBuffer = buffer.AsMemory();
 256
 257            try
 258            {
 4259                if (_transport == null)
 0260                    throw new InvalidOperationException("Handshake not completed while trying to read a message");
 261
 4262                if (_tcpClient is null || !IsSocketConnected())
 0263                    throw new InvalidOperationException("TcpClient is not connected while trying to read a message");
 264
 265                // Read response
 4266                var stream = _tcpClient.GetStream();
 4267                var lenRead = await stream.ReadAsync(memoryBuffer[..ProtocolConstants.MessageHeaderSize], _cts.Token);
 0268                if (_cts.IsCancellationRequested)
 0269                    break;
 270
 0271                if (lenRead != ProtocolConstants.MessageHeaderSize)
 272                {
 0273                    if (!IsSocketConnected() || lenRead == 0)
 0274                        throw new ConnectionException(
 0275                            "TcpClient is not connected while trying to read a message header");
 276
 0277                    throw new ConnectionException("Peer sent wrong length");
 278                }
 279
 0280                var messageLen =
 0281                    _transport.ReadMessageLength(memoryBuffer[..ProtocolConstants.MessageHeaderSize].Span);
 0282                if (_cts.IsCancellationRequested)
 0283                    break;
 284
 0285                if (messageLen > ProtocolConstants.MaxMessageLength)
 0286                    throw new ConnectionException("Peer sent message too long");
 287
 0288                if (!IsSocketConnected())
 0289                    throw new ConnectionException("TcpClient is not connected while trying to read a message body");
 290
 0291                lenRead = await stream.ReadAsync(memoryBuffer[..messageLen], _cts.Token);
 0292                if (_cts.IsCancellationRequested)
 0293                    break;
 294
 0295                if (lenRead != messageLen)
 0296                    throw new ConnectionException("Peer sent wrong body length");
 297
 0298                messageLen = _transport.ReadMessagePayload(memoryBuffer[..lenRead].Span, buffer);
 299
 300                // Raise event
 0301                var messageStream = new MemoryStream(buffer[..messageLen]);
 0302                MessageReceived?.Invoke(this, messageStream);
 0303            }
 0304            catch (OperationCanceledException)
 305            {
 306                // Ignore cancellation
 0307            }
 0308            catch (ConnectionException)
 309            {
 0310                throw;
 311            }
 0312            catch (Exception e)
 313            {
 0314                if (!_cts.IsCancellationRequested)
 315                {
 0316                    if (_tcpClient is null || !_tcpClient.Connected)
 0317                        throw new ConnectionException("Peer closed the connection");
 318
 0319                    throw new ConnectionException("Error reading response", e);
 320                }
 0321            }
 322            finally
 323            {
 0324                ArrayPool<byte>.Shared.Return(buffer, true);
 325            }
 0326        }
 327
 0328        _tcs.TrySetResult(true);
 0329    }
 330
 331    #region Dispose Pattern
 332
 333    public void Dispose()
 334    {
 0335        Dispose(true);
 0336        GC.SuppressFinalize(this);
 0337    }
 338
 339    private void Dispose(bool disposing)
 340    {
 2341        if (_disposed)
 0342            return;
 343
 2344        if (disposing)
 345        {
 346            // Cancel and wait for an elegant shutdown
 0347            _cts.Cancel();
 0348            _tcs.Task.Wait(TimeSpan.FromSeconds(5));
 349
 0350            _handshakeService?.Dispose();
 0351            _transport?.Dispose();
 0352            _tcpClient.Dispose();
 353        }
 354
 2355        _disposed = true;
 2356    }
 357
 358    ~TransportService()
 359    {
 2360        Dispose(false);
 4361    }
 362
 363    #endregion
 364}