< Summary - Combined Code Coverage

Information
Class: NLightning.Infrastructure.Transport.Services.TransportService
Assembly: NLightning.Infrastructure
File(s): /home/runner/work/nlightning/nlightning/src/NLightning.Infrastructure/Transport/Services/TransportService.cs
Tag: 36_15743069263
Line coverage
42%
Covered lines: 60
Uncovered lines: 82
Coverable lines: 142
Total lines: 288
Line coverage: 42.2%
Branch coverage
24%
Covered branches: 13
Total branches: 54
Branch coverage: 24%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%1.04165%
get_IsInitiator()100%210%
get_IsConnected()100%210%
get_RemoteStaticPublicKey()100%11100%
.ctor(...)100%210%
InitializeAsync()38.89%21.461259.65%
WriteMessageAsync()0%4260%
ReadResponseAsync()18.18%269.812220%
Dispose()100%210%
Dispose(...)25%21.82840%
Finalize()100%11100%

File(s)

/home/runner/work/nlightning/nlightning/src/NLightning.Infrastructure/Transport/Services/TransportService.cs

#LineLine coverage
 1using System.Buffers;
 2using System.Net.Sockets;
 3using Microsoft.Extensions.Logging;
 4
 5namespace NLightning.Infrastructure.Transport.Services;
 6
 7using Crypto.Interfaces;
 8using Domain.Crypto.ValueObjects;
 9using Domain.Exceptions;
 10using Domain.Protocol.Interfaces;
 11using Domain.Serialization.Interfaces;
 12using Domain.Transport;
 13using Exceptions;
 14using Interfaces;
 15using Protocol.Constants;
 16
 17internal sealed class TransportService : ITransportService
 18{
 1219    private readonly CancellationTokenSource _cts = new();
 20    private readonly ILogger _logger;
 21    private readonly IMessageSerializer _messageSerializer;
 22    private readonly TimeSpan _networkTimeout;
 1223    private readonly SemaphoreSlim _networkWriteSemaphore = new(1, 1);
 24    private readonly TcpClient _tcpClient;
 1225    private readonly TaskCompletionSource<bool> _tcs = new();
 26
 27    private IHandshakeService? _handshakeService;
 28    private ITransport? _transport;
 29    private bool _disposed;
 30
 31    // event that will be called when a message is received
 32    public event EventHandler<MemoryStream>? MessageReceived;
 33    public event EventHandler<Exception>? ExceptionRaised;
 34
 035    public bool IsInitiator { get; }
 036    public bool IsConnected => _tcpClient.Connected;
 437    public CompactPubKey? RemoteStaticPublicKey { get; private set; }
 38
 39    public TransportService(IEcdh ecdh, ILogger logger, IMessageSerializer messageSerializer, TimeSpan networkTimeout,
 40                            bool isInitiator, ReadOnlySpan<byte> s, ReadOnlySpan<byte> rs, TcpClient tcpClient)
 041        : this(logger, messageSerializer, networkTimeout, new HandshakeService(isInitiator, s, rs, null, ecdh),
 042               tcpClient)
 43    {
 044        _messageSerializer = messageSerializer;
 045        _networkTimeout = networkTimeout;
 046    }
 47
 1248    internal TransportService(ILogger logger, IMessageSerializer messageSerializer, TimeSpan networkTimeout,
 1249                              IHandshakeService handshakeService, TcpClient tcpClient)
 50    {
 1251        _handshakeService = handshakeService;
 1252        _logger = logger;
 1253        _messageSerializer = messageSerializer;
 1254        _networkTimeout = networkTimeout;
 1255        _tcpClient = tcpClient;
 56
 1257        IsInitiator = handshakeService.IsInitiator;
 1258    }
 59
 60    public async Task InitializeAsync()
 61    {
 1262        if (_handshakeService == null)
 063            throw new NullReferenceException(nameof(_handshakeService));
 64
 1265        if (!_tcpClient.Connected)
 466            throw new InvalidOperationException("TcpClient is not connected");
 67
 868        var writeBuffer = new byte[50];
 869        var stream = _tcpClient.GetStream();
 70
 871        CancellationTokenSource networkTimeoutCancellationTokenSource = new();
 72
 873        if (_handshakeService.IsInitiator)
 74        {
 75            try
 76            {
 877                _logger.LogTrace("We're initiator, writing Act One");
 78
 79                // Write Act One
 880                var len = _handshakeService.PerformStep(ProtocolConstants.EmptyMessage, writeBuffer, out _);
 881                await stream.WriteAsync(writeBuffer.AsMemory()[..len], networkTimeoutCancellationTokenSource.Token);
 882                await stream.FlushAsync(networkTimeoutCancellationTokenSource.Token);
 83
 84                // Read exactly 50 bytes
 885                _logger.LogTrace("Reading Act Two");
 886                networkTimeoutCancellationTokenSource = new CancellationTokenSource(_networkTimeout);
 887                var readBuffer = new byte[50];
 888                await stream.ReadExactlyAsync(readBuffer, networkTimeoutCancellationTokenSource.Token);
 489                networkTimeoutCancellationTokenSource.Dispose();
 90
 91                // Read Act Two and Write Act Three
 492                _logger.LogTrace("Writing Act Three");
 493                writeBuffer = new byte[66];
 494                len = _handshakeService.PerformStep(readBuffer, writeBuffer, out _transport);
 495                await stream.WriteAsync(writeBuffer.AsMemory()[..len], CancellationToken.None);
 496                await stream.FlushAsync(CancellationToken.None);
 497            }
 498            catch (Exception e)
 99            {
 4100                throw new ConnectionTimeoutException("Timeout while reading Handhsake's Act 2", e);
 101            }
 102        }
 103        else
 104        {
 0105            var act = 1;
 106
 107            try
 108            {
 0109                _logger.LogTrace("We're NOT initiator, reading Act One");
 110
 111                // Read exactly 50 bytes
 0112                networkTimeoutCancellationTokenSource = new CancellationTokenSource(_networkTimeout);
 0113                var readBuffer = new byte[50];
 0114                await stream.ReadExactlyAsync(readBuffer, networkTimeoutCancellationTokenSource.Token);
 115
 116                // Read Act One and Write Act Two
 0117                _logger.LogTrace("Writing Act Two");
 0118                var len = _handshakeService.PerformStep(readBuffer, writeBuffer, out _);
 0119                await stream.WriteAsync(writeBuffer.AsMemory()[..len], CancellationToken.None);
 0120                await stream.FlushAsync(CancellationToken.None);
 121
 122                // Read exactly 66 bytes
 0123                _logger.LogTrace("Reading Act Three");
 0124                act = 3;
 0125                networkTimeoutCancellationTokenSource = new CancellationTokenSource(_networkTimeout);
 0126                readBuffer = new byte[66];
 0127                await stream.ReadExactlyAsync(readBuffer, networkTimeoutCancellationTokenSource.Token);
 0128                networkTimeoutCancellationTokenSource.Dispose();
 129
 130                // Read Act Three
 0131                _ = _handshakeService.PerformStep(readBuffer, writeBuffer, out _transport);
 0132            }
 0133            catch (Exception e)
 134            {
 0135                throw new ConnectionTimeoutException($"Timeout while reading Handhsake's Act {act}", e);
 136            }
 137        }
 138
 139        // Handshake completed
 4140        if (_transport is null)
 141        {
 0142            throw new InvalidOperationException("Handshake not completed");
 143        }
 144
 4145        RemoteStaticPublicKey = _handshakeService.RemoteStaticPublicKey
 4146                             ?? throw new InvalidOperationException("RemoteStaticPublicKey is null");
 147
 148        // Listen to messages and raise event
 4149        _logger.LogTrace("Handshake completed, listening to messages");
 4150        _ = Task.Run(ReadResponseAsync, CancellationToken.None).ContinueWith(task =>
 4151        {
 0152            if (task.Exception?.InnerExceptions.Count > 0)
 0153                ExceptionRaised?.Invoke(this, task.Exception.InnerExceptions[0]);
 4154        }, TaskContinuationOptions.OnlyOnFaulted);
 155
 156        // Dispose of the handshake service
 4157        _handshakeService.Dispose();
 4158        _handshakeService = null;
 4159    }
 160
 161    public async Task WriteMessageAsync(IMessage message, CancellationToken cancellationToken = default)
 162    {
 0163        if (_tcpClient is null || !_tcpClient.Connected)
 0164            throw new InvalidOperationException("TcpClient is not connected");
 165
 0166        if (_transport is null)
 0167            throw new InvalidOperationException("Handshake not completed");
 168
 169        // Serialize message
 0170        using var messageStream = new MemoryStream();
 0171        await _messageSerializer.SerializeAsync(message, messageStream);
 172
 173        // Encrypt message
 0174        var buffer = new byte[ProtocolConstants.MaxMessageLength];
 0175        var size = _transport.WriteMessage(messageStream.ToArray(), buffer);
 176
 177        // Write the message to stream
 0178        await _networkWriteSemaphore.WaitAsync(cancellationToken);
 179        try
 180        {
 0181            var stream = _tcpClient.GetStream();
 0182            await stream.WriteAsync(buffer.AsMemory()[..size], cancellationToken);
 0183            await stream.FlushAsync(cancellationToken);
 0184        }
 185        finally
 186        {
 0187            _networkWriteSemaphore.Release();
 188        }
 0189    }
 190
 191    private async Task ReadResponseAsync()
 192    {
 4193        while (!_cts.IsCancellationRequested)
 194        {
 4195            var buffer = ArrayPool<byte>.Shared.Rent(ProtocolConstants.MaxMessageLength);
 4196            var memoryBuffer = buffer.AsMemory();
 197
 198            try
 199            {
 4200                if (_transport == null)
 0201                    throw new InvalidOperationException("Handshake not completed");
 202
 4203                if (_tcpClient is null || !_tcpClient.Connected)
 0204                    throw new InvalidOperationException("TcpClient is not connected");
 205
 206                // Read response
 4207                var stream = _tcpClient.GetStream();
 4208                var lenRead = await stream.ReadAsync(memoryBuffer[..ProtocolConstants.MessageHeaderSize], _cts.Token);
 0209                if (lenRead != ProtocolConstants.MessageHeaderSize)
 0210                    throw new ConnectionException("Peer sent wrong length");
 211
 0212                var messageLen = _transport.ReadMessageLength(memoryBuffer[..ProtocolConstants.MessageHeaderSize].Span);
 0213                if (messageLen > ProtocolConstants.MaxMessageLength)
 0214                    throw new ConnectionException("Peer sent message too long");
 215
 0216                lenRead = await stream.ReadAsync(memoryBuffer[..messageLen], _cts.Token);
 0217                if (lenRead != messageLen)
 0218                    throw new ConnectionException("Peer sent wrong body length");
 219
 0220                messageLen = _transport.ReadMessagePayload(memoryBuffer[..lenRead].Span, buffer);
 221
 222                // Raise event
 0223                var messageStream = new MemoryStream(buffer[..messageLen]);
 0224                MessageReceived?.Invoke(this, messageStream);
 0225            }
 0226            catch (OperationCanceledException)
 227            {
 228                // Ignore cancellation
 0229            }
 0230            catch (ConnectionException)
 231            {
 0232                throw;
 233            }
 0234            catch (Exception e)
 235            {
 0236                if (!_cts.IsCancellationRequested)
 237                {
 0238                    if (_tcpClient is null || !_tcpClient.Connected)
 0239                        throw new ConnectionException("Peer closed the connection");
 240
 0241                    throw new ConnectionException("Error reading response", e);
 242                }
 0243            }
 244            finally
 245            {
 0246                ArrayPool<byte>.Shared.Return(buffer, true);
 247            }
 0248        }
 249
 0250        _tcs.TrySetResult(true);
 0251    }
 252
 253    #region Dispose Pattern
 254
 255    public void Dispose()
 256    {
 0257        Dispose(true);
 0258        GC.SuppressFinalize(this);
 0259    }
 260
 261    private void Dispose(bool disposing)
 262    {
 2263        if (_disposed)
 264        {
 0265            return;
 266        }
 267
 2268        if (disposing)
 269        {
 270            // Cancel and wait for an elegant shutdown
 0271            _cts.Cancel();
 0272            _tcs.Task.Wait(TimeSpan.FromSeconds(5));
 273
 0274            _handshakeService?.Dispose();
 0275            _transport?.Dispose();
 0276            _tcpClient.Dispose();
 277        }
 278
 2279        _disposed = true;
 2280    }
 281
 282    ~TransportService()
 283    {
 2284        Dispose(false);
 4285    }
 286
 287    #endregion
 288}