< Summary - Combined Code Coverage

Information
Class: NLightning.Application.Node.Managers.PeerManager
Assembly: NLightning.Application
File(s): /home/runner/work/nlightning/nlightning/src/NLightning.Application/Node/Managers/PeerManager.cs
Tag: 38_17925369700
Line coverage
69%
Covered lines: 105
Uncovered lines: 47
Coverable lines: 152
Total lines: 310
Line coverage: 69%
Branch coverage
55%
Covered branches: 30
Total branches: 54
Branch coverage: 55.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
StartAsync()37.5%18.781055.56%
StopAsync()62.5%21.02841.18%
ConnectToPeerAsync()100%11100%
DisconnectPeer(...)75%4.07483.33%
ConnectToPeerAsync()50%4.02488.89%
HandleNewPeerConnected(...)100%11100%
HandlePeerDisconnection(...)50%2.04277.78%
HandlePeerChannelMessage(...)50%4.43470%
HandleChannelMessageResponseAsync()70%26.252075%
HandleResponseMessageReady(...)0%2040%

File(s)

/home/runner/work/nlightning/nlightning/src/NLightning.Application/Node/Managers/PeerManager.cs

#LineLine coverage
 1using Microsoft.Extensions.DependencyInjection;
 2using Microsoft.Extensions.Logging;
 3
 4namespace NLightning.Application.Node.Managers;
 5
 6using Domain.Channels.Enums;
 7using Domain.Channels.Events;
 8using Domain.Channels.Interfaces;
 9using Domain.Crypto.ValueObjects;
 10using Domain.Exceptions;
 11using Domain.Node.Events;
 12using Domain.Node.Interfaces;
 13using Domain.Node.Models;
 14using Domain.Node.ValueObjects;
 15using Domain.Persistence.Interfaces;
 16using Domain.Protocol.Constants;
 17using Domain.Protocol.Interfaces;
 18using Infrastructure.Transport.Events;
 19using Infrastructure.Transport.Interfaces;
 20
 21/// <summary>
 22/// Service for managing peers.
 23/// </summary>
 24/// <remarks>
 25/// This class is used to manage peers in the network.
 26/// </remarks>
 27/// <seealso cref="IPeerManager" />
 28public sealed class PeerManager : IPeerManager
 29{
 30    private readonly IChannelManager _channelManager;
 31    private readonly ILogger<PeerManager> _logger;
 32    private readonly IPeerServiceFactory _peerServiceFactory;
 33    private readonly ITcpService _tcpService;
 34    private readonly IServiceProvider _serviceProvider;
 5235    private readonly Dictionary<CompactPubKey, PeerModel> _peers = [];
 36
 37    private CancellationTokenSource? _cts;
 38
 5239    public PeerManager(IChannelManager channelManager, ILogger<PeerManager> logger,
 5240                       IPeerServiceFactory peerServiceFactory, ITcpService tcpService, IServiceProvider serviceProvider)
 41    {
 5242        _channelManager = channelManager;
 5243        _logger = logger;
 5244        _peerServiceFactory = peerServiceFactory;
 5245        _tcpService = tcpService;
 5246        _serviceProvider = serviceProvider;
 5247    }
 48
 49    public async Task StartAsync(CancellationToken cancellationToken)
 50    {
 1651        _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 52
 1653        _tcpService.OnNewPeerConnected += HandleNewPeerConnected;
 54
 1655        _channelManager.OnResponseMessageReady += HandleResponseMessageReady;
 56
 57        // Load peers and initialize the channel manager
 1658        using var scope = _serviceProvider.CreateScope();
 1659        using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
 1660        var peers = await uow.GetPeersForStartupAsync();
 3261        foreach (var peer in peers)
 62        {
 063            await ConnectToPeerAsync(peer.PeerAddressInfo, uow);
 064            if (!_peers.TryGetValue(peer.NodeId, out _))
 65            {
 066                _logger.LogWarning("Unable to connect to peer {PeerId} on startup", peer.NodeId);
 67                // TODO: Handle this case, maybe retry or log more details
 068                continue;
 69            }
 70
 71            // Register channels with peer
 072            if (peer.Channels is not { Count: > 0 })
 73                continue;
 74
 75            // Only register channels that are not closed or stale
 076            foreach (var channel in peer.Channels.Where(c => c.State != ChannelState.Closed))
 77                // We don't care about the result here, as we just want to register the existing channels
 078                _ = _channelManager.RegisterExistingChannelAsync(channel);
 079        }
 80
 1681        await uow.SaveChangesAsync();
 82
 1683        await _tcpService.StartListeningAsync(_cts.Token);
 1684    }
 85
 86    public async Task StopAsync()
 87    {
 488        if (_cts is null)
 089            throw new InvalidOperationException($"{nameof(PeerManager)} is not running");
 90
 1691        foreach (var peerKey in _peers.Keys)
 92            try
 93            {
 494                DisconnectPeer(peerKey);
 495            }
 096            catch (Exception e)
 97            {
 098                _logger.LogWarning(e, "Error disconnecting peer {Peer}", peerKey);
 099            }
 100
 101        try
 102        {
 103            // Give it a 5-second timeout to disconnect all peers
 4104            var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
 4105            while (_peers.Count > 0 && !_cts.IsCancellationRequested)
 4106                await Task.Delay(TimeSpan.FromSeconds(1), timeoutTokenSource.Token);
 0107        }
 0108        catch (TaskCanceledException)
 109        {
 0110            _logger.LogWarning("Timeout while waiting for peers to disconnect");
 0111        }
 112
 0113        await _cts.CancelAsync();
 0114    }
 115
 116    /// <inheritdoc />
 117    /// <exception cref="ConnectionException">Thrown when the connection to the peer fails.</exception>
 118    public async Task ConnectToPeerAsync(PeerAddressInfo peerAddressInfo)
 119    {
 8120        using var scope = _serviceProvider.CreateScope();
 8121        using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
 122
 8123        await ConnectToPeerAsync(peerAddressInfo, uow);
 124
 4125        await uow.SaveChangesAsync();
 4126    }
 127
 128    /// <inheritdoc />
 129    public void DisconnectPeer(CompactPubKey pubKey)
 130    {
 16131        if (_peers.TryGetValue(pubKey, out var peer))
 132        {
 12133            if (peer.TryGetPeerService(out var peerService))
 134            {
 12135                peerService.Disconnect();
 136            }
 137            else
 138            {
 0139                _logger.LogWarning("PeerService not found for {Peer}", pubKey);
 140            }
 141        }
 142        else
 143        {
 4144            _logger.LogWarning("Peer {Peer} not found", pubKey);
 145        }
 4146    }
 147
 148    private async Task ConnectToPeerAsync(PeerAddressInfo peerAddressInfo, IUnitOfWork uow)
 149    {
 150        // Connect to the peer
 8151        var connectedPeer = await _tcpService.ConnectToPeerAsync(peerAddressInfo);
 152
 4153        var peerService = await _peerServiceFactory.CreateConnectedPeerAsync(connectedPeer.CompactPubKey,
 4154                                                                             connectedPeer.TcpClient);
 4155        peerService.OnDisconnect += HandlePeerDisconnection;
 4156        peerService.OnChannelMessageReceived += HandlePeerChannelMessage;
 157
 158        // Check if the peer wants us to use a different host and port
 4159        var host = connectedPeer.Host;
 4160        var port = connectedPeer.Port; // Default port for Lightning Network
 4161        if (peerService.PreferredHost is not null && peerService.PreferredPort.HasValue)
 162        {
 0163            host = peerService.PreferredHost;
 0164            port = peerService.PreferredPort.Value;
 165        }
 166
 4167        var peer = new PeerModel(connectedPeer.CompactPubKey, host, port)
 4168        {
 4169            LastSeenAt = DateTime.UtcNow
 4170        };
 4171        peer.SetPeerService(peerService);
 172
 4173        _peers.Add(connectedPeer.CompactPubKey, peer);
 174
 4175        uow.PeerDbRepository.AddOrUpdateAsync(peer).GetAwaiter().GetResult();
 4176    }
 177
 178    private void HandleNewPeerConnected(object? _, NewPeerConnectedEventArgs args)
 179    {
 180        try
 181        {
 182            // Create the peer
 8183            var peerService = _peerServiceFactory.CreateConnectingPeerAsync(args.TcpClient).GetAwaiter().GetResult();
 4184            peerService.OnDisconnect += HandlePeerDisconnection;
 4185            peerService.OnChannelMessageReceived += HandlePeerChannelMessage;
 186
 4187            _logger.LogTrace("PeerService created for peer {PeerPubKey}", peerService.PeerPubKey);
 188
 4189            var peer = new PeerModel(peerService.PeerPubKey, args.Host, args.Port)
 4190            {
 4191                LastSeenAt = DateTime.UtcNow
 4192            };
 4193            peer.SetPeerService(peerService);
 194
 4195            _peers.Add(peerService.PeerPubKey, peer);
 4196        }
 4197        catch (Exception e)
 198        {
 4199            _logger.LogError(e, "Error handling new peer connection from {Host}:{Port}", args.Host, args.Port);
 4200        }
 8201    }
 202
 203    private void HandlePeerDisconnection(object? sender, PeerDisconnectedEventArgs args)
 204    {
 4205        ArgumentNullException.ThrowIfNull(args);
 206
 4207        _peers.Remove(args.PeerPubKey);
 4208        _logger.LogInformation("Peer {Peer} disconnected", args.PeerPubKey);
 209
 4210        if (sender is IPeerService peerService)
 211        {
 0212            peerService.OnDisconnect -= HandlePeerDisconnection;
 0213            peerService.OnChannelMessageReceived -= HandlePeerChannelMessage;
 214        }
 215        else
 216        {
 4217            _logger.LogWarning("Peer {Peer} disconnected, but we were unable to detach event handlers",
 4218                               args.PeerPubKey);
 219        }
 4220    }
 221
 222    private void HandlePeerChannelMessage(object? _, ChannelMessageEventArgs args)
 223    {
 16224        ArgumentNullException.ThrowIfNull(args);
 225
 16226        if (!_peers.TryGetValue(args.PeerPubKey, out var peer))
 0227            throw new ConnectionException($"Peer {args.PeerPubKey} not found while handling channel message");
 228
 16229        if (!peer.TryGetPeerService(out var peerService))
 0230            throw new ConnectionException(
 0231                $"PeerService not found for peer {args.PeerPubKey} while handling channel message");
 232
 16233        _channelManager.HandleChannelMessageAsync(args.Message, peerService.Features, peerService.PeerPubKey)
 32234                       .ContinueWith(task => HandleChannelMessageResponseAsync(task, peerService.PeerPubKey,
 32235                                                                               args.Message.Type));
 16236    }
 237
 238    private async Task HandleChannelMessageResponseAsync(Task<IChannelMessage?> task, CompactPubKey peerPubKey,
 239                                                         MessageTypes messageType)
 240    {
 16241        if (!_peers.TryGetValue(peerPubKey, out var peer))
 0242            throw new ConnectionException($"Peer {peerPubKey} not found while handling channel response message");
 243
 16244        if (!peer.TryGetPeerService(out var peerService))
 0245            throw new ConnectionException(
 0246                $"PeerService not found for peer {peerPubKey} while handling channel response message");
 247
 16248        if (task.IsFaulted)
 249        {
 8250            if (task.Exception is { InnerException: ChannelErrorException cee })
 251            {
 4252                _logger.LogError(
 4253                    "Error handling channel message ({messageType}) from peer {peer}: {message}",
 4254                    Enum.GetName(messageType), peerService.PeerPubKey,
 4255                    !string.IsNullOrEmpty(cee.PeerMessage)
 4256                        ? cee.PeerMessage
 4257                        : cee.Message);
 258
 4259                DisconnectPeer(peerService.PeerPubKey);
 4260                return;
 261            }
 262
 4263            if (task.Exception is { InnerException: ChannelWarningException cwe })
 264            {
 4265                _logger.LogWarning(
 4266                    "Error handling channel message ({messageType}) from peer {peer}: {message}",
 4267                    Enum.GetName(messageType), peerService.PeerPubKey,
 4268                    !string.IsNullOrEmpty(cwe.PeerMessage)
 4269                        ? cwe.PeerMessage
 4270                        : cwe.Message);
 271
 4272                return;
 273            }
 274
 0275            _logger.LogError(
 0276                task.Exception, "Error handling channel message ({messageType}) from peer {peer}",
 0277                Enum.GetName(messageType), peerService.PeerPubKey);
 278
 0279            DisconnectPeer(peerService.PeerPubKey);
 0280            return;
 281        }
 282
 8283        var replyMessage = task.Result;
 8284        if (replyMessage is not null)
 285        {
 8286            await peerService.SendMessageAsync(replyMessage);
 287        }
 16288    }
 289
 290    private void HandleResponseMessageReady(object? sender, ChannelResponseMessageEventArgs args)
 291    {
 0292        ArgumentNullException.ThrowIfNull(args);
 293
 294        // Find PeerService by CompactPubKey
 0295        if (!_peers.TryGetValue(args.PeerPubKey, out var peer))
 0296            throw new ConnectionException($"Peer {args.PeerPubKey} not found while handling response message");
 297
 0298        if (!peer.TryGetPeerService(out var peerService))
 0299            throw new ConnectionException(
 0300                $"PeerService not found for peer {args.PeerPubKey} while handling response message");
 301
 302        // Send the response message to the peer
 0303        peerService.SendMessageAsync(args.ResponseMessage)
 0304                   .ContinueWith(task =>
 0305                    {
 0306                        _logger.LogError(task.Exception, "Failed to send response message to peer {Peer}",
 0307                                         args.PeerPubKey);
 0308                    }, TaskContinuationOptions.OnlyOnFaulted);
 0309    }
 310}