< Summary - Combined Code Coverage

Information
Class: NLightning.Infrastructure.Node.Services.PeerCommunicationService
Assembly: NLightning.Infrastructure
File(s): /home/runner/work/nlightning/nlightning/src/NLightning.Infrastructure/Node/Services/PeerCommunicationService.cs
Tag: 38_17925369700
Line coverage
0%
Covered lines: 0
Uncovered lines: 126
Coverable lines: 126
Total lines: 279
Line coverage: 0%
Branch coverage
0%
Covered branches: 0
Total branches: 50
Branch coverage: 0%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%210%
get_IsConnected()100%210%
get_PeerCompactPubKey()100%210%
InitializeAsync()0%2040%
SendMessageAsync()100%210%
Disconnect()0%620%
SetupPingPongService()100%210%
HandlePongReceived(...)100%210%
HandlePingMessageReady(...)0%620%
HandleMessageReceived(...)0%342180%
HandlePingAsync()100%210%
HandleExceptionRaised(...)100%210%
RaiseException(...)0%420200%

File(s)

/home/runner/work/nlightning/nlightning/src/NLightning.Infrastructure/Node/Services/PeerCommunicationService.cs

#LineLine coverage
 1using Microsoft.Extensions.DependencyInjection;
 2using Microsoft.Extensions.Logging;
 3
 4namespace NLightning.Infrastructure.Node.Services;
 5
 6using Domain.Channels.ValueObjects;
 7using Domain.Crypto.ValueObjects;
 8using Domain.Exceptions;
 9using Domain.Node.Interfaces;
 10using Domain.Persistence.Interfaces;
 11using Domain.Protocol.Constants;
 12using Domain.Protocol.Interfaces;
 13using Domain.Protocol.Messages;
 14using Domain.Protocol.Payloads;
 15
 16/// <summary>
 17/// Service for communication with a single peer.
 18/// </summary>
 19public class PeerCommunicationService : IPeerCommunicationService
 20{
 021    private readonly CancellationTokenSource _cts = new();
 22    private readonly ILogger<PeerCommunicationService> _logger;
 23    private readonly IMessageService _messageService;
 24    private readonly IPingPongService _pingPongService;
 25    private readonly IServiceProvider _serviceProvider;
 26    private readonly IMessageFactory _messageFactory;
 027    private readonly TaskCompletionSource<bool> _pingPongTcs = new();
 28
 29    private bool _isInitialized;
 30    private CancellationTokenSource? _initWaitCancellationTokenSource;
 31
 32    /// <inheritdoc />
 33    public event EventHandler<IMessage?>? MessageReceived;
 34
 35    /// <inheritdoc />
 36    public event EventHandler? DisconnectEvent;
 37
 38    /// <inheritdoc />
 39    public event EventHandler<Exception>? ExceptionRaised;
 40
 41    /// <inheritdoc />
 042    public bool IsConnected => _messageService.IsConnected;
 43
 44    /// <inheritdoc />
 045    public CompactPubKey PeerCompactPubKey { get; }
 46
 47    /// <summary>
 48    /// Initializes a new instance of the <see cref="PeerCommunicationService"/> class.
 49    /// </summary>
 50    /// <param name="logger">The logger.</param>
 51    /// <param name="messageService">The message service.</param>
 52    /// <param name="messageFactory">The message factory.</param>
 53    /// <param name="peerCompactPubKey">The peer's public key.</param>
 54    /// <param name="pingPongService">The ping pong service.</param>
 55    /// <param name="serviceProvider">The service provider.</param>
 056    public PeerCommunicationService(ILogger<PeerCommunicationService> logger, IMessageService messageService,
 057                                    IMessageFactory messageFactory, CompactPubKey peerCompactPubKey,
 058                                    IPingPongService pingPongService, IServiceProvider serviceProvider)
 59    {
 060        _logger = logger;
 061        _messageService = messageService;
 062        _messageFactory = messageFactory;
 063        PeerCompactPubKey = peerCompactPubKey;
 064        _pingPongService = pingPongService;
 065        _serviceProvider = serviceProvider;
 66
 067        _messageService.OnMessageReceived += HandleMessageReceived;
 068        _messageService.OnExceptionRaised += HandleExceptionRaised;
 069        _pingPongService.DisconnectEvent += HandleExceptionRaised;
 070    }
 71
 72    /// <inheritdoc />
 73    public async Task InitializeAsync(TimeSpan networkTimeout)
 74    {
 075        _logger.LogTrace("Waiting for init message from peer {peer}", PeerCompactPubKey);
 76
 77        // Set timeout to close connection if the other peer doesn't send an init message
 078        _initWaitCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token);
 079        _ = Task.Delay(networkTimeout, _initWaitCancellationTokenSource.Token).ContinueWith(task =>
 080        {
 081            if (!task.IsCanceled && !_isInitialized)
 082            {
 083                RaiseException(
 084                    new ConnectionException($"Peer {PeerCompactPubKey} did not send init message after timeout"));
 085            }
 086        });
 87
 88        // Always send an init message upon connection
 089        _logger.LogTrace("Sending init message to peer {peer}", PeerCompactPubKey);
 090        var initMessage = _messageFactory.CreateInitMessage();
 91        try
 92        {
 093            await _messageService.SendMessageAsync(initMessage, true, _cts.Token);
 094        }
 095        catch (Exception e)
 96        {
 097            _pingPongTcs.TrySetResult(true);
 098            throw new ConnectionException($"Failed to send init message to peer {PeerCompactPubKey}", e);
 99        }
 100
 101        // Set up ping service to keep connection alive
 0102        if (!_cts.IsCancellationRequested)
 103        {
 0104            if (!_messageService.IsConnected)
 0105                throw new ConnectionException($"Failed to connect to peer {PeerCompactPubKey}");
 106
 0107            SetupPingPongService();
 108        }
 0109    }
 110
 111    /// <inheritdoc />
 112    public async Task SendMessageAsync(IMessage message, CancellationToken cancellationToken = default)
 113    {
 114        try
 115        {
 0116            await _messageService.SendMessageAsync(message, cancellationToken: cancellationToken);
 0117        }
 0118        catch (Exception ex)
 119        {
 0120            RaiseException(new ConnectionException($"Failed to send message to peer {PeerCompactPubKey}", ex));
 0121        }
 0122    }
 123
 124    /// <inheritdoc />
 125    public void Disconnect()
 126    {
 127        try
 128        {
 0129            _ = _cts.CancelAsync();
 0130            _logger.LogTrace("Waiting for ping service to stop for peer {peer}", PeerCompactPubKey);
 0131            _pingPongTcs.Task.Wait(TimeSpan.FromSeconds(5));
 0132            _logger.LogTrace("Ping service stopped for peer {peer}", PeerCompactPubKey);
 0133        }
 134        finally
 135        {
 0136            _messageService.Dispose();
 0137            DisconnectEvent?.Invoke(this, EventArgs.Empty);
 0138        }
 0139    }
 140
 141    private void SetupPingPongService()
 142    {
 0143        _pingPongService.OnPingMessageReady += HandlePingMessageReady;
 0144        _pingPongService.OnPongReceived += HandlePongReceived;
 145
 146        // Setup Ping to keep connection alive
 0147        _ = _pingPongService.StartPingAsync(_cts.Token).ContinueWith(_ =>
 0148        {
 0149            _logger.LogTrace("Ping service stopped for peer {peer}, setting result", PeerCompactPubKey);
 0150            _pingPongTcs.TrySetResult(true);
 0151        });
 152
 0153        _logger.LogInformation("Ping service started for peer {peer}", PeerCompactPubKey);
 0154    }
 155
 156    private void HandlePongReceived(object? sender, EventArgs e)
 157    {
 0158        using var scope = _serviceProvider.CreateScope();
 0159        using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
 160
 0161        uow.PeerDbRepository.UpdatePeerLastSeenAsync(PeerCompactPubKey).GetAwaiter().GetResult();
 0162        uow.SaveChanges();
 0163    }
 164
 165    private void HandlePingMessageReady(object? sender, IMessage pingMessage)
 166    {
 167        // We can only send ping messages if the peer is initialized
 0168        if (!_isInitialized)
 0169            return;
 170
 171        try
 172        {
 0173            _messageService.SendMessageAsync(pingMessage, cancellationToken: _cts.Token).GetAwaiter().GetResult();
 0174        }
 0175        catch (Exception ex)
 176        {
 0177            RaiseException(new ConnectionException($"Failed to send ping message to peer {PeerCompactPubKey}", ex));
 0178        }
 0179    }
 180
 181    private void HandleMessageReceived(object? sender, IMessage? message)
 182    {
 0183        if (message is null)
 184        {
 0185            return;
 186        }
 187
 0188        if (!_isInitialized && message.Type == MessageTypes.Init)
 189        {
 0190            _isInitialized = true;
 0191            _initWaitCancellationTokenSource?.Cancel();
 192        }
 193
 194        // Forward the message to subscribers
 0195        MessageReceived?.Invoke(this, message);
 196
 197        // Handle ping messages internally
 0198        if (_isInitialized && message.Type == MessageTypes.Ping)
 199        {
 0200            _ = HandlePingAsync(message);
 201        }
 0202        else if (_isInitialized && message.Type == MessageTypes.Pong)
 203        {
 0204            _pingPongService.HandlePong(message);
 205        }
 0206    }
 207
 208    private async Task HandlePingAsync(IMessage pingMessage)
 209    {
 0210        var pongMessage = _messageFactory.CreatePongMessage(pingMessage);
 0211        await _messageService.SendMessageAsync(pongMessage);
 0212    }
 213
 214    private void HandleExceptionRaised(object? sender, Exception e)
 215    {
 0216        RaiseException(e);
 0217    }
 218
 219    private void RaiseException(Exception exception)
 220    {
 0221        var mustDisconnect = false;
 0222        if (exception is ErrorException errorException)
 223        {
 0224            ChannelId? channelId = null;
 0225            var message = errorException.Message;
 0226            mustDisconnect = true;
 227
 0228            if (errorException is ChannelErrorException channelErrorException)
 229            {
 0230                channelId = channelErrorException.ChannelId;
 0231                if (!string.IsNullOrWhiteSpace(channelErrorException.PeerMessage))
 0232                    message = channelErrorException.PeerMessage;
 233            }
 234
 0235            if (errorException is not ConnectionException)
 236            {
 0237                _logger.LogTrace("Sending error message to peer {peer}. ChannelId: {channelId}, Message: {message}",
 0238                                 PeerCompactPubKey, channelId, message);
 239
 0240                _ = Task.Run(() => _messageService.SendMessageAsync(
 0241                                 new ErrorMessage(new ErrorPayload(channelId, message))));
 242
 0243                return;
 244            }
 245        }
 0246        else if (exception is WarningException warningException)
 247        {
 0248            ChannelId? channelId = null;
 0249            var message = warningException.Message;
 250
 0251            if (warningException is ChannelWarningException channelWarningException)
 252            {
 0253                channelId = channelWarningException.ChannelId;
 0254                if (!string.IsNullOrWhiteSpace(channelWarningException.PeerMessage))
 0255                    message = channelWarningException.PeerMessage;
 256            }
 257
 0258            _logger.LogTrace("Sending warning message to peer {peer}. ChannelId: {channelId}, Message: {message}",
 0259                             PeerCompactPubKey, channelId, message);
 260
 0261            _ = Task.Run(() => _messageService.SendMessageAsync(
 0262                             new WarningMessage(new ErrorPayload(channelId, message))));
 263        }
 264
 265        // Forward the exception to subscribers
 0266        ExceptionRaised?.Invoke(this, exception);
 267
 268        // Disconnect if not already disconnecting
 0269        if (mustDisconnect && !_cts.IsCancellationRequested)
 270        {
 0271            _messageService.OnMessageReceived -= HandleMessageReceived;
 0272            _messageService.OnExceptionRaised -= HandleExceptionRaised;
 0273            _pingPongService.DisconnectEvent -= HandleExceptionRaised;
 274
 0275            _logger.LogWarning(exception, "We're disconnecting peer {peer} because of an exception", PeerCompactPubKey);
 0276            Disconnect();
 277        }
 0278    }
 279}