< Summary - Combined Code Coverage

Information
Class: NLightning.Application.Node.Managers.PeerManager
Assembly: NLightning.Application
File(s): /home/runner/work/nlightning/nlightning/src/NLightning.Application/Node/Managers/PeerManager.cs
Tag: 36_15743069263
Line coverage
70%
Covered lines: 100
Uncovered lines: 42
Coverable lines: 142
Total lines: 291
Line coverage: 70.4%
Branch coverage
55%
Covered branches: 29
Total branches: 52
Branch coverage: 55.7%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
StartAsync()37.5%18.781055.56%
StopAsync()62.5%17.99846.15%
ConnectToPeerAsync()100%11100%
DisconnectPeer(...)75%4.07483.33%
ConnectToPeerAsync()50%4.02488.89%
HandleNewPeerConnected(...)100%11100%
HandlePeerDisconnection(...)100%11100%
HandlePeerChannelMessage(...)50%4.43470%
HandleChannelMessageResponseAsync()70%26.252075%
HandleResponseMessageReady(...)0%2040%

File(s)

/home/runner/work/nlightning/nlightning/src/NLightning.Application/Node/Managers/PeerManager.cs

#LineLine coverage
 1using Microsoft.Extensions.DependencyInjection;
 2using Microsoft.Extensions.Logging;
 3
 4namespace NLightning.Application.Node.Managers;
 5
 6using Domain.Channels.Enums;
 7using Domain.Channels.Events;
 8using Domain.Channels.Interfaces;
 9using Domain.Crypto.ValueObjects;
 10using Domain.Exceptions;
 11using Domain.Node.Events;
 12using Domain.Node.Interfaces;
 13using Domain.Node.Models;
 14using Domain.Node.ValueObjects;
 15using Domain.Persistence.Interfaces;
 16using Domain.Protocol.Constants;
 17using Domain.Protocol.Interfaces;
 18using Infrastructure.Transport.Events;
 19using Infrastructure.Transport.Interfaces;
 20
 21/// <summary>
 22/// Service for managing peers.
 23/// </summary>
 24/// <remarks>
 25/// This class is used to manage peers in the network.
 26/// </remarks>
 27/// <seealso cref="IPeerManager" />
 28public sealed class PeerManager : IPeerManager
 29{
 30    private readonly IChannelManager _channelManager;
 31    private readonly ILogger<PeerManager> _logger;
 32    private readonly IPeerServiceFactory _peerServiceFactory;
 33    private readonly ITcpService _tcpService;
 34    private readonly IServiceProvider _serviceProvider;
 5235    private readonly Dictionary<CompactPubKey, PeerModel> _peers = [];
 36
 37    private CancellationTokenSource? _cts;
 38
 5239    public PeerManager(IChannelManager channelManager, ILogger<PeerManager> logger,
 5240                       IPeerServiceFactory peerServiceFactory, ITcpService tcpService, IServiceProvider serviceProvider)
 41    {
 5242        _channelManager = channelManager;
 5243        _logger = logger;
 5244        _peerServiceFactory = peerServiceFactory;
 5245        _tcpService = tcpService;
 5246        _serviceProvider = serviceProvider;
 5247    }
 48
 49    public async Task StartAsync(CancellationToken cancellationToken)
 50    {
 1651        _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 52
 1653        _tcpService.OnNewPeerConnected += HandleNewPeerConnected;
 54
 1655        _channelManager.OnResponseMessageReady += HandleResponseMessageReady;
 56
 57        // Load peers and initialize the channel manager
 1658        using var scope = _serviceProvider.CreateScope();
 1659        using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
 1660        var peers = await uow.GetPeersForStartupAsync();
 3261        foreach (var peer in peers)
 62        {
 063            await ConnectToPeerAsync(peer.PeerAddressInfo, uow);
 064            if (!_peers.TryGetValue(peer.NodeId, out _))
 65            {
 066                _logger.LogWarning("Unable to connect to peer {PeerId} on startup", peer.NodeId);
 67                // TODO: Handle this case, maybe retry or log more details
 068                continue;
 69            }
 70
 71            // Register channels with peer
 072            if (peer.Channels is not { Count: > 0 })
 73                continue;
 74
 75            // Only register channels that are not closed or stale
 076            foreach (var channel in peer.Channels.Where(c => c.State != ChannelState.Closed))
 77                // We don't care about the result here, as we just want to register the existing channels
 078                _ = _channelManager.RegisterExistingChannelAsync(channel);
 079        }
 80
 1681        await uow.SaveChangesAsync();
 82
 1683        await _tcpService.StartListeningAsync(_cts.Token);
 1684    }
 85
 86    public async Task StopAsync()
 87    {
 488        if (_cts is null)
 089            throw new InvalidOperationException($"{nameof(PeerManager)} is not running");
 90
 1691        foreach (var peerKey in _peers.Keys)
 492            DisconnectPeer(peerKey);
 93
 94        try
 95        {
 96            // Give it a 5-second timeout to disconnect all peers
 497            var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
 498            while (_peers.Count > 0 && !_cts.IsCancellationRequested)
 499                await Task.Delay(TimeSpan.FromSeconds(1), timeoutTokenSource.Token);
 0100        }
 0101        catch (TaskCanceledException)
 102        {
 0103            _logger.LogWarning("Timeout while waiting for peers to disconnect");
 0104        }
 105
 0106        await _cts.CancelAsync();
 0107    }
 108
 109    /// <inheritdoc />
 110    /// <exception cref="ConnectionException">Thrown when the connection to the peer fails.</exception>
 111    public async Task ConnectToPeerAsync(PeerAddressInfo peerAddressInfo)
 112    {
 8113        using var scope = _serviceProvider.CreateScope();
 8114        using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
 115
 8116        await ConnectToPeerAsync(peerAddressInfo, uow);
 117
 4118        await uow.SaveChangesAsync();
 4119    }
 120
 121    /// <inheritdoc />
 122    public void DisconnectPeer(CompactPubKey pubKey)
 123    {
 16124        if (_peers.TryGetValue(pubKey, out var peer))
 125        {
 12126            if (peer.TryGetPeerService(out var peerService))
 127            {
 12128                peerService.Disconnect();
 129            }
 130            else
 131            {
 0132                _logger.LogWarning("PeerService not found for {Peer}", pubKey);
 133            }
 134        }
 135        else
 136        {
 4137            _logger.LogWarning("Peer {Peer} not found", pubKey);
 138        }
 4139    }
 140
 141    private async Task ConnectToPeerAsync(PeerAddressInfo peerAddressInfo, IUnitOfWork uow)
 142    {
 143        // Connect to the peer
 8144        var connectedPeer = await _tcpService.ConnectToPeerAsync(peerAddressInfo);
 145
 4146        var peerService = await _peerServiceFactory.CreateConnectedPeerAsync(connectedPeer.CompactPubKey,
 4147                                                                             connectedPeer.TcpClient);
 4148        peerService.OnDisconnect += HandlePeerDisconnection;
 4149        peerService.OnChannelMessageReceived += HandlePeerChannelMessage;
 150
 151        // Check if the peer wants us to use a different host and port
 4152        var host = connectedPeer.Host;
 4153        var port = connectedPeer.Port; // Default port for Lightning Network
 4154        if (peerService.PreferredHost is not null && peerService.PreferredPort.HasValue)
 155        {
 0156            host = peerService.PreferredHost;
 0157            port = peerService.PreferredPort.Value;
 158        }
 159
 4160        var peer = new PeerModel(connectedPeer.CompactPubKey, host, port)
 4161        {
 4162            LastSeenAt = DateTime.UtcNow
 4163        };
 4164        peer.SetPeerService(peerService);
 165
 4166        _peers.Add(connectedPeer.CompactPubKey, peer);
 167
 4168        uow.PeerDbRepository.AddOrUpdateAsync(peer).GetAwaiter().GetResult();
 4169    }
 170
 171    private void HandleNewPeerConnected(object? _, NewPeerConnectedEventArgs args)
 172    {
 173        try
 174        {
 175            // Create the peer
 8176            var peerService = _peerServiceFactory.CreateConnectingPeerAsync(args.TcpClient).GetAwaiter().GetResult();
 177
 4178            peerService.OnDisconnect += HandlePeerDisconnection;
 4179            peerService.OnChannelMessageReceived += HandlePeerChannelMessage;
 180
 4181            var peer = new PeerModel(peerService.PeerPubKey, args.Host, args.Port)
 4182            {
 4183                LastSeenAt = DateTime.UtcNow
 4184            };
 4185            peer.SetPeerService(peerService);
 186
 4187            _peers.Add(peerService.PeerPubKey, peer);
 4188        }
 4189        catch (Exception e)
 190        {
 4191            _logger.LogError(e, "Error handling new peer connection from {Host}:{Port}", args.Host, args.Port);
 4192        }
 8193    }
 194
 195    private void HandlePeerDisconnection(object? _, PeerDisconnectedEventArgs args)
 196    {
 4197        ArgumentNullException.ThrowIfNull(args);
 198
 4199        _peers.Remove(args.PeerPubKey);
 4200        _logger.LogInformation("Peer {Peer} disconnected", args.PeerPubKey);
 4201    }
 202
 203    private void HandlePeerChannelMessage(object? _, ChannelMessageEventArgs args)
 204    {
 16205        ArgumentNullException.ThrowIfNull(args);
 206
 16207        if (!_peers.TryGetValue(args.PeerPubKey, out var peer))
 0208            throw new ConnectionException($"Peer {args.PeerPubKey} not found while handling channel message");
 209
 16210        if (!peer.TryGetPeerService(out var peerService))
 0211            throw new ConnectionException(
 0212                $"PeerService not found for peer {args.PeerPubKey} while handling channel message");
 213
 16214        _channelManager.HandleChannelMessageAsync(args.Message, peerService.Features, peerService.PeerPubKey)
 32215                       .ContinueWith(task => HandleChannelMessageResponseAsync(task, peerService.PeerPubKey,
 32216                                                                               args.Message.Type));
 16217    }
 218
 219    private async Task HandleChannelMessageResponseAsync(Task<IChannelMessage?> task, CompactPubKey peerPubKey,
 220                                                         MessageTypes messageType)
 221    {
 16222        if (!_peers.TryGetValue(peerPubKey, out var peer))
 0223            throw new ConnectionException($"Peer {peerPubKey} not found while handling channel response message");
 224
 16225        if (!peer.TryGetPeerService(out var peerService))
 0226            throw new ConnectionException(
 0227                $"PeerService not found for peer {peerPubKey} while handling channel response message");
 228
 16229        if (task.IsFaulted)
 230        {
 8231            if (task.Exception is { InnerException: ChannelErrorException cee })
 232            {
 4233                _logger.LogError(
 4234                    "Error handling channel message ({messageType}) from peer {peer}: {message}",
 4235                    Enum.GetName(messageType), peerService.PeerPubKey,
 4236                    !string.IsNullOrEmpty(cee.PeerMessage)
 4237                        ? cee.PeerMessage
 4238                        : cee.Message);
 239
 4240                DisconnectPeer(peerService.PeerPubKey);
 4241                return;
 242            }
 243
 4244            if (task.Exception is { InnerException: ChannelWarningException cwe })
 245            {
 4246                _logger.LogWarning(
 4247                    "Error handling channel message ({messageType}) from peer {peer}: {message}",
 4248                    Enum.GetName(messageType), peerService.PeerPubKey,
 4249                    !string.IsNullOrEmpty(cwe.PeerMessage)
 4250                        ? cwe.PeerMessage
 4251                        : cwe.Message);
 252
 4253                return;
 254            }
 255
 0256            _logger.LogError(
 0257                task.Exception, "Error handling channel message ({messageType}) from peer {peer}",
 0258                Enum.GetName(messageType), peerService.PeerPubKey);
 259
 0260            DisconnectPeer(peerService.PeerPubKey);
 0261            return;
 262        }
 263
 8264        var replyMessage = task.Result;
 8265        if (replyMessage is not null)
 266        {
 8267            await peerService.SendMessageAsync(replyMessage);
 268        }
 16269    }
 270
 271    private void HandleResponseMessageReady(object? sender, ChannelResponseMessageEventArgs args)
 272    {
 0273        ArgumentNullException.ThrowIfNull(args);
 274
 275        // Find PeerService by CompactPubKey
 0276        if (!_peers.TryGetValue(args.PeerPubKey, out var peer))
 0277            throw new ConnectionException($"Peer {args.PeerPubKey} not found while handling response message");
 278
 0279        if (!peer.TryGetPeerService(out var peerService))
 0280            throw new ConnectionException(
 0281                $"PeerService not found for peer {args.PeerPubKey} while handling response message");
 282
 283        // Send the response message to the peer
 0284        peerService.SendMessageAsync(args.ResponseMessage)
 0285                   .ContinueWith(task =>
 0286                    {
 0287                        _logger.LogError(task.Exception, "Failed to send response message to peer {Peer}",
 0288                                         args.PeerPubKey);
 0289                    }, TaskContinuationOptions.OnlyOnFaulted);
 0290    }
 291}