< Summary - Combined Code Coverage

Information
Class: NLightning.Infrastructure.Node.Services.PeerCommunicationService
Assembly: NLightning.Infrastructure
File(s): /home/runner/work/nlightning/nlightning/src/NLightning.Infrastructure/Node/Services/PeerCommunicationService.cs
Tag: 36_15743069263
Line coverage
0%
Covered lines: 0
Uncovered lines: 107
Coverable lines: 107
Total lines: 245
Line coverage: 0%
Branch coverage
0%
Covered branches: 0
Total branches: 44
Branch coverage: 0%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%210%
get_IsConnected()100%210%
get_PeerCompactPubKey()100%210%
InitializeAsync()0%620%
SendMessageAsync()100%210%
Disconnect()0%620%
SetupPingPongService()100%210%
HandlePongReceived(...)100%210%
HandlePingMessageReady(...)0%620%
HandleMessageReceived(...)0%272160%
HandlePingAsync()100%210%
HandleExceptionRaised(...)100%210%
RaiseException(...)0%342180%

File(s)

/home/runner/work/nlightning/nlightning/src/NLightning.Infrastructure/Node/Services/PeerCommunicationService.cs

#LineLine coverage
 1using Microsoft.Extensions.DependencyInjection;
 2using Microsoft.Extensions.Logging;
 3
 4namespace NLightning.Infrastructure.Node.Services;
 5
 6using Domain.Channels.ValueObjects;
 7using Domain.Crypto.ValueObjects;
 8using Domain.Exceptions;
 9using Domain.Node.Interfaces;
 10using Domain.Persistence.Interfaces;
 11using Domain.Protocol.Constants;
 12using Domain.Protocol.Interfaces;
 13using Domain.Protocol.Messages;
 14using Domain.Protocol.Payloads;
 15
 16/// <summary>
 17/// Service for communication with a single peer.
 18/// </summary>
 19public class PeerCommunicationService : IPeerCommunicationService
 20{
 021    private readonly CancellationTokenSource _cancellationTokenSource = new();
 22    private readonly ILogger<PeerCommunicationService> _logger;
 23    private readonly IMessageService _messageService;
 24    private readonly IPingPongService _pingPongService;
 25    private readonly IServiceProvider _serviceProvider;
 26    private readonly IMessageFactory _messageFactory;
 27    private bool _isInitialized;
 28
 29    /// <inheritdoc />
 30    public event EventHandler<IMessage?>? MessageReceived;
 31
 32    /// <inheritdoc />
 33    public event EventHandler? DisconnectEvent;
 34
 35    /// <inheritdoc />
 36    public event EventHandler<Exception>? ExceptionRaised;
 37
 38    /// <inheritdoc />
 039    public bool IsConnected => _messageService.IsConnected;
 40
 41    /// <inheritdoc />
 042    public CompactPubKey PeerCompactPubKey { get; }
 43
 44    /// <summary>
 45    /// Initializes a new instance of the <see cref="PeerCommunicationService"/> class.
 46    /// </summary>
 47    /// <param name="logger">The logger.</param>
 48    /// <param name="messageService">The message service.</param>
 49    /// <param name="messageFactory">The message factory.</param>
 50    /// <param name="peerCompactPubKey">The peer's public key.</param>
 51    /// <param name="pingPongService">The ping pong service.</param>
 52    /// <param name="serviceProvider">The service provider.</param>
 053    public PeerCommunicationService(ILogger<PeerCommunicationService> logger, IMessageService messageService,
 054                                    IMessageFactory messageFactory, CompactPubKey peerCompactPubKey,
 055                                    IPingPongService pingPongService, IServiceProvider serviceProvider)
 56    {
 057        _logger = logger;
 058        _messageService = messageService;
 059        _messageFactory = messageFactory;
 060        PeerCompactPubKey = peerCompactPubKey;
 061        _pingPongService = pingPongService;
 062        _serviceProvider = serviceProvider;
 63
 064        _messageService.OnMessageReceived += HandleMessageReceived;
 065        _messageService.OnExceptionRaised += HandleExceptionRaised;
 066        _pingPongService.DisconnectEvent += HandleExceptionRaised;
 067    }
 68
 69    /// <inheritdoc />
 70    public async Task InitializeAsync(TimeSpan networkTimeout)
 71    {
 72        // Always send an init message upon connection
 073        _logger.LogTrace("Sending init message to peer {peer}", PeerCompactPubKey);
 074        var initMessage = _messageFactory.CreateInitMessage();
 075        await _messageService.SendMessageAsync(initMessage, _cancellationTokenSource.Token);
 76
 77        // Wait for an init message
 078        _logger.LogTrace("Waiting for init message from peer {peer}", PeerCompactPubKey);
 79
 80        // Set timeout to close connection if the other peer doesn't send an init message
 081        _ = Task.Delay(networkTimeout, _cancellationTokenSource.Token).ContinueWith(task =>
 082        {
 083            if (!task.IsCanceled && !_isInitialized)
 084            {
 085                RaiseException(
 086                    new ConnectionException($"Peer {PeerCompactPubKey} did not send init message after timeout"));
 087            }
 088        });
 89
 090        if (!_messageService.IsConnected)
 91        {
 092            throw new ConnectionException($"Failed to connect to peer {PeerCompactPubKey}");
 93        }
 94
 95        // Set up ping service to keep connection alive
 096        SetupPingPongService();
 097    }
 98
 99    /// <inheritdoc />
 100    public async Task SendMessageAsync(IMessage message, CancellationToken cancellationToken = default)
 101    {
 102        try
 103        {
 0104            await _messageService.SendMessageAsync(message, cancellationToken);
 0105        }
 0106        catch (Exception ex)
 107        {
 0108            RaiseException(new ConnectionException($"Failed to send message to peer {PeerCompactPubKey}", ex));
 0109        }
 0110    }
 111
 112    /// <inheritdoc />
 113    public void Disconnect()
 114    {
 0115        _logger.LogInformation("Disconnecting peer {peer}", PeerCompactPubKey);
 0116        _cancellationTokenSource.Cancel();
 0117        _messageService.Dispose();
 118
 0119        DisconnectEvent?.Invoke(this, EventArgs.Empty);
 0120    }
 121
 122    private void SetupPingPongService()
 123    {
 0124        _pingPongService.OnPingMessageReady += HandlePingMessageReady;
 0125        _pingPongService.OnPongReceived += HandlePongReceived;
 126
 127        // Setup Ping to keep connection alive
 0128        _ = _pingPongService.StartPingAsync(_cancellationTokenSource.Token).ContinueWith(task =>
 0129        {
 0130            if (task.IsFaulted)
 0131            {
 0132                RaiseException(new ConnectionException($"Failed to start ping service for peer {PeerCompactPubKey}",
 0133                                                       task.Exception));
 0134            }
 0135        });
 136
 0137        _logger.LogInformation("Ping service started for peer {peer}", PeerCompactPubKey);
 0138    }
 139
 140    private void HandlePongReceived(object? sender, EventArgs e)
 141    {
 0142        using var scope = _serviceProvider.CreateScope();
 0143        using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
 144
 0145        uow.PeerDbRepository.UpdatePeerLastSeenAsync(PeerCompactPubKey).GetAwaiter().GetResult();
 0146        uow.SaveChanges();
 0147    }
 148
 149    private void HandlePingMessageReady(object? sender, IMessage pingMessage)
 150    {
 151        // We can only send ping messages if the peer is initialized
 0152        if (!_isInitialized)
 0153            return;
 154
 155        try
 156        {
 0157            _messageService.SendMessageAsync(pingMessage, _cancellationTokenSource.Token).GetAwaiter().GetResult();
 0158        }
 0159        catch (Exception ex)
 160        {
 0161            RaiseException(new ConnectionException($"Failed to send ping message to peer {PeerCompactPubKey}", ex));
 0162        }
 0163    }
 164
 165    private void HandleMessageReceived(object? sender, IMessage? message)
 166    {
 0167        if (message is null)
 168        {
 0169            return;
 170        }
 171
 0172        if (!_isInitialized && message.Type == MessageTypes.Init)
 173        {
 0174            _isInitialized = true;
 175        }
 176
 177        // Forward the message to subscribers
 0178        MessageReceived?.Invoke(this, message);
 179
 180        // Handle ping messages internally
 0181        if (_isInitialized && message.Type == MessageTypes.Ping)
 182        {
 0183            _ = HandlePingAsync(message);
 184        }
 0185        else if (_isInitialized && message.Type == MessageTypes.Pong)
 186        {
 0187            _pingPongService.HandlePong(message);
 188        }
 0189    }
 190
 191    private async Task HandlePingAsync(IMessage pingMessage)
 192    {
 0193        var pongMessage = _messageFactory.CreatePongMessage(pingMessage);
 0194        await _messageService.SendMessageAsync(pongMessage);
 0195    }
 196
 197    private void HandleExceptionRaised(object? sender, Exception e)
 198    {
 0199        RaiseException(e);
 0200    }
 201
 202    private void RaiseException(Exception exception)
 203    {
 0204        var mustDisconnect = false;
 0205        if (exception is ErrorException errorException)
 206        {
 0207            ChannelId? channelId = null;
 0208            var message = errorException.Message;
 209
 0210            if (errorException is ChannelErrorException channelErrorException)
 211            {
 0212                channelId = channelErrorException.ChannelId;
 0213                if (!string.IsNullOrWhiteSpace(channelErrorException.PeerMessage))
 0214                    message = channelErrorException.PeerMessage;
 215            }
 216
 0217            _messageService.SendMessageAsync(new ErrorMessage(new ErrorPayload(channelId, message)));
 0218            mustDisconnect = true;
 219        }
 0220        else if (exception is WarningException warningException)
 221        {
 0222            ChannelId? channelId = null;
 0223            var message = warningException.Message;
 224
 0225            if (warningException is ChannelWarningException channelWarningException)
 226            {
 0227                channelId = channelWarningException.ChannelId;
 0228                if (!string.IsNullOrWhiteSpace(channelWarningException.PeerMessage))
 0229                    message = channelWarningException.PeerMessage;
 230            }
 231
 0232            _messageService.SendMessageAsync(new WarningMessage(new ErrorPayload(channelId, message)));
 233        }
 234
 0235        _logger.LogError(exception, "Exception occurred with peer {peer}. {exceptionMessage}", PeerCompactPubKey,
 0236                         exception.Message);
 237
 238        // Forward the exception to subscribers
 0239        ExceptionRaised?.Invoke(this, exception);
 240
 241        // Disconnect if not already disconnecting
 0242        if (mustDisconnect && !_cancellationTokenSource.IsCancellationRequested)
 0243            Disconnect();
 0244    }
 245}