< Summary - Combined Code Coverage

Information
Class: NLightning.Infrastructure.Transport.Services.TransportService
Assembly: NLightning.Infrastructure
File(s): /home/runner/work/nlightning/nlightning/src/NLightning.Infrastructure/Transport/Services/TransportService.cs
Tag: 30_15166811759
Line coverage
47%
Covered lines: 67
Uncovered lines: 73
Coverable lines: 140
Total lines: 297
Line coverage: 47.8%
Branch coverage
37%
Covered branches: 20
Total branches: 54
Branch coverage: 37%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%1.03168.42%
get_IsInitiator()100%210%
get_IsConnected()100%210%
get_RemoteStaticPublicKey()100%11100%
.ctor(...)100%210%
InitializeAsync()55.56%18.491264.41%
WriteMessageAsync()0%4260%
ReadResponseAsync()36.36%179.282231.25%
Dispose()100%210%
Dispose(...)25%21.82840%
Finalize()100%11100%

File(s)

/home/runner/work/nlightning/nlightning/src/NLightning.Infrastructure/Transport/Services/TransportService.cs

#LineLine coverage
 1using System.Net.Sockets;
 2using Microsoft.Extensions.Logging;
 3
 4namespace NLightning.Infrastructure.Transport.Services;
 5
 6using Domain.Exceptions;
 7using Domain.Protocol.Messages.Interfaces;
 8using Domain.Serialization.Messages;
 9using Domain.Transport;
 10using Exceptions;
 11using Interfaces;
 12using Protocol.Constants;
 13
 14internal sealed class TransportService : ITransportService
 15{
 1216    private readonly CancellationTokenSource _cts = new();
 17    private readonly ILogger _logger;
 18    private readonly IMessageSerializer _messageSerializer;
 19    private readonly TimeSpan _networkTimeout;
 1220    private readonly SemaphoreSlim _networkWriteSemaphore = new(1, 1);
 21    private readonly TcpClient _tcpClient;
 1222    private readonly TaskCompletionSource<bool> _tcs = new();
 23
 24    private IHandshakeService? _handshakeService;
 25    private ITransport? _transport;
 26    private bool _disposed;
 27
 28    // event that will be called when a message is received
 29    public event EventHandler<MemoryStream>? MessageReceived;
 30    public event EventHandler<Exception>? ExceptionRaised;
 31
 032    public bool IsInitiator { get; }
 033    public bool IsConnected => _tcpClient.Connected;
 434    public NBitcoin.PubKey? RemoteStaticPublicKey { get; private set; }
 35
 36    public TransportService(ILogger logger, IMessageSerializer messageSerializer, TimeSpan networkTimeout,
 37                            bool isInitiator, ReadOnlySpan<byte> s, ReadOnlySpan<byte> rs, TcpClient tcpClient)
 038        : this(logger, messageSerializer, networkTimeout, new HandshakeService(isInitiator, s, rs), tcpClient)
 39    {
 040        _messageSerializer = messageSerializer;
 041        _networkTimeout = networkTimeout;
 042    }
 43
 1244    internal TransportService(ILogger logger, IMessageSerializer messageSerializer, TimeSpan networkTimeout, IHandshakeS
 1245                              TcpClient tcpClient)
 46    {
 1247        _handshakeService = handshakeService;
 1248        _logger = logger;
 1249        _messageSerializer = messageSerializer;
 1250        _networkTimeout = networkTimeout;
 1251        _tcpClient = tcpClient;
 52
 1253        IsInitiator = handshakeService.IsInitiator;
 1254    }
 55
 56    public async Task InitializeAsync()
 57    {
 1258        if (_handshakeService == null)
 59        {
 060            throw new NullReferenceException(nameof(_handshakeService));
 61        }
 62
 1263        if (!_tcpClient.Connected)
 64        {
 465            throw new InvalidOperationException("TcpClient is not connected");
 66        }
 67
 868        var writeBuffer = new byte[50];
 869        var stream = _tcpClient.GetStream();
 70
 871        CancellationTokenSource networkTimeoutCancellationTokenSource = new();
 72
 873        if (_handshakeService.IsInitiator)
 74        {
 75            try
 76            {
 877                _logger.LogTrace("We're initiator, writing Act One");
 78
 79                // Write Act One
 880                var len = _handshakeService.PerformStep(ProtocolConstants.EMPTY_MESSAGE, writeBuffer, out _);
 881                await stream.WriteAsync(writeBuffer.AsMemory()[..len], networkTimeoutCancellationTokenSource.Token);
 882                await stream.FlushAsync(networkTimeoutCancellationTokenSource.Token);
 83
 84                // Read exactly 50 bytes
 885                _logger.LogTrace("Reading Act Two");
 886                networkTimeoutCancellationTokenSource = new CancellationTokenSource(_networkTimeout);
 887                var readBuffer = new byte[50];
 888                await stream.ReadExactlyAsync(readBuffer, networkTimeoutCancellationTokenSource.Token);
 489                networkTimeoutCancellationTokenSource.Dispose();
 90
 91                // Read Act Two and Write Act Three
 492                _logger.LogTrace("Writing Act Three");
 493                writeBuffer = new byte[66];
 494                len = _handshakeService.PerformStep(readBuffer, writeBuffer, out _transport);
 495                await stream.WriteAsync(writeBuffer.AsMemory()[..len], CancellationToken.None);
 496                await stream.FlushAsync(CancellationToken.None);
 497            }
 498            catch (Exception e)
 99            {
 4100                throw new ConnectionTimeoutException("Timeout while reading Handhsake's Act 2", e);
 101            }
 102        }
 103        else
 104        {
 0105            var act = 1;
 106
 107            try
 108            {
 0109                _logger.LogTrace("We're NOT initiator, reading Act One");
 110
 111                // Read exactly 50 bytes
 0112                networkTimeoutCancellationTokenSource = new CancellationTokenSource(_networkTimeout);
 0113                var readBuffer = new byte[50];
 0114                await stream.ReadExactlyAsync(readBuffer, networkTimeoutCancellationTokenSource.Token);
 115
 116                // Read Act One and Write Act Two
 0117                _logger.LogTrace("Writing Act Two");
 0118                var len = _handshakeService.PerformStep(readBuffer, writeBuffer, out _);
 0119                await stream.WriteAsync(writeBuffer.AsMemory()[..len], CancellationToken.None);
 0120                await stream.FlushAsync(CancellationToken.None);
 121
 122                // Read exactly 66 bytes
 0123                _logger.LogTrace("Reading Act Three");
 0124                act = 3;
 0125                networkTimeoutCancellationTokenSource = new CancellationTokenSource(_networkTimeout);
 0126                readBuffer = new byte[66];
 0127                await stream.ReadExactlyAsync(readBuffer, networkTimeoutCancellationTokenSource.Token);
 0128                networkTimeoutCancellationTokenSource.Dispose();
 129
 130                // Read Act Three
 0131                _ = _handshakeService.PerformStep(readBuffer, writeBuffer, out _transport);
 0132            }
 0133            catch (Exception e)
 134            {
 0135                throw new ConnectionTimeoutException($"Timeout while reading Handhsake's Act {act}", e);
 136            }
 137        }
 138
 139        // Handshake completed
 4140        if (_transport is null)
 141        {
 0142            throw new InvalidOperationException("Handshake not completed");
 143        }
 4144        RemoteStaticPublicKey = _handshakeService.RemoteStaticPublicKey
 4145                                ?? throw new InvalidOperationException("RemoteStaticPublicKey is null");
 146
 147        // Listen to messages and raise event
 4148        _logger.LogTrace("Handshake completed, listening to messages");
 4149        _ = Task.Run(ReadResponseAsync, CancellationToken.None).ContinueWith(task =>
 4150        {
 2151            if (task.Exception?.InnerExceptions.Count > 0)
 4152            {
 2153                ExceptionRaised?.Invoke(this, task.Exception.InnerExceptions[0]);
 4154            }
 4155        }, TaskContinuationOptions.OnlyOnFaulted);
 156
 157        // Dispose of the handshake service
 4158        _handshakeService.Dispose();
 4159        _handshakeService = null;
 4160    }
 161
 162    public async Task WriteMessageAsync(IMessage message, CancellationToken cancellationToken = default)
 163    {
 0164        if (_tcpClient is null || !_tcpClient.Connected)
 165        {
 0166            throw new InvalidOperationException("TcpClient is not connected");
 167        }
 168
 0169        if (_transport is null)
 170        {
 0171            throw new InvalidOperationException("Handshake not completed");
 172        }
 173
 174        // Serialize message
 0175        using var messageStream = new MemoryStream();
 0176        await _messageSerializer.SerializeAsync(message, messageStream);
 177
 178        // Encrypt message
 0179        var buffer = new byte[ProtocolConstants.MAX_MESSAGE_LENGTH];
 0180        var size = _transport.WriteMessage(messageStream.ToArray(), buffer);
 181
 182        // Write message to stream
 0183        await _networkWriteSemaphore.WaitAsync(cancellationToken);
 184        try
 185        {
 0186            var stream = _tcpClient.GetStream();
 0187            await stream.WriteAsync(buffer.AsMemory()[..size], cancellationToken);
 0188            await stream.FlushAsync(cancellationToken);
 0189        }
 190        finally
 191        {
 0192            _networkWriteSemaphore.Release();
 193        }
 0194    }
 195
 196    private async Task ReadResponseAsync()
 197    {
 4198        while (!_cts.IsCancellationRequested)
 199        {
 200            try
 201            {
 4202                if (_transport == null)
 203                {
 0204                    throw new InvalidOperationException("Handshake not completed");
 205                }
 206
 4207                if (_tcpClient is null || !_tcpClient.Connected)
 208                {
 0209                    throw new InvalidOperationException("TcpClient is not connected");
 210                }
 211
 212                // Read response
 4213                var stream = _tcpClient.GetStream();
 4214                var memory = new byte[ProtocolConstants.MAX_MESSAGE_LENGTH].AsMemory();
 4215                var lenRead = await stream.ReadAsync(memory[..ProtocolConstants.MESSAGE_HEADER_SIZE], _cts.Token);
 0216                if (lenRead != 18)
 217                {
 0218                    throw new ConnectionException("Peer sent wrong length");
 219                }
 220
 0221                var messageLen = _transport.ReadMessageLength(memory.Span[..ProtocolConstants.MESSAGE_HEADER_SIZE]);
 0222                if (messageLen > ProtocolConstants.MAX_MESSAGE_LENGTH)
 223                {
 0224                    throw new ConnectionException("Peer sent message too long");
 225                }
 226
 0227                lenRead = await stream.ReadAsync(memory[..messageLen], _cts.Token);
 0228                if (lenRead != messageLen)
 229                {
 0230                    throw new ConnectionException("Peer sent wrong body length");
 231                }
 232
 0233                messageLen = _transport.ReadMessagePayload(memory.Span[..messageLen], memory.Span);
 234
 235                // Raise event
 0236                var messageStream = new MemoryStream(memory.Span[..messageLen].ToArray());
 0237                MessageReceived?.Invoke(this, messageStream);
 0238            }
 0239            catch (OperationCanceledException)
 240            {
 241                // Ignore cancellation
 0242            }
 0243            catch (ConnectionException)
 244            {
 0245                throw;
 246            }
 2247            catch (Exception e)
 248            {
 2249                if (!_cts.IsCancellationRequested)
 250                {
 2251                    if (_tcpClient is null || !_tcpClient.Connected)
 252                    {
 2253                        throw new ConnectionException("Peer closed the connection");
 254                    }
 255
 0256                    throw new ConnectionException("Error reading response", e);
 257                }
 0258            }
 259        }
 260
 0261        _tcs.TrySetResult(true);
 0262    }
 263
 264    #region Dispose Pattern
 265    public void Dispose()
 266    {
 0267        Dispose(true);
 0268        GC.SuppressFinalize(this);
 0269    }
 270
 271    private void Dispose(bool disposing)
 272    {
 4273        if (_disposed)
 274        {
 0275            return;
 276        }
 277
 4278        if (disposing)
 279        {
 280            // Cancel and wait for an elegant shutdown
 0281            _cts.Cancel();
 0282            _tcs.Task.Wait(TimeSpan.FromSeconds(5));
 283
 0284            _handshakeService?.Dispose();
 0285            _transport?.Dispose();
 0286            _tcpClient.Dispose();
 287        }
 288
 4289        _disposed = true;
 4290    }
 291
 292    ~TransportService()
 293    {
 4294        Dispose(false);
 8295    }
 296    #endregion
 297}