< Summary - Combined Code Coverage

Information
Class: NLightning.Infrastructure.Bitcoin.Wallet.BlockchainMonitorService
Assembly: NLightning.Infrastructure.Bitcoin
File(s): /home/runner/work/nlightning/nlightning/src/NLightning.Infrastructure.Bitcoin/Wallet/BlockchainMonitorService.cs
Tag: 36_15743069263
Line coverage
77%
Covered lines: 154
Uncovered lines: 45
Coverable lines: 199
Total lines: 482
Line coverage: 77.3%
Branch coverage
71%
Covered branches: 40
Total branches: 56
Branch coverage: 71.4%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)50%22100%
StartAsync()100%44100%
StopAsync()75%4.43470%
WatchTransactionAsync()100%11100%
MonitorBlockchainAsync()28.57%69.381434.38%
InitializeZmqSockets()100%1.05163.64%
CleanupZmqSockets()50%2.21262.5%
ProcessPendingBlocksAsync()100%44100%
AddMissingBlocksToProcessAsync()87.5%8.05890.91%
ProcessNewBlock()100%1.01176.92%
ProcessBlock(...)100%2.04278.57%
ConfirmTransaction(...)50%22100%
CheckWatchedTransactionsForBlock(...)83.33%6.34678.95%
CheckWatchedTransactionsDepth(...)100%7.33666.67%
LoadPendingWatchedTransactionsAsync()100%22100%

File(s)

/home/runner/work/nlightning/nlightning/src/NLightning.Infrastructure.Bitcoin/Wallet/BlockchainMonitorService.cs

#LineLine coverage
 1using System.Collections.Concurrent;
 2using Microsoft.Extensions.DependencyInjection;
 3using Microsoft.Extensions.Logging;
 4using Microsoft.Extensions.Options;
 5using NBitcoin;
 6using NetMQ;
 7using NetMQ.Sockets;
 8
 9namespace NLightning.Infrastructure.Bitcoin.Wallet;
 10
 11using Domain.Bitcoin.Events;
 12using Domain.Bitcoin.Transactions.Models;
 13using Domain.Bitcoin.ValueObjects;
 14using Domain.Channels.ValueObjects;
 15using Domain.Crypto.ValueObjects;
 16using Domain.Node.Options;
 17using Domain.Persistence.Interfaces;
 18using Interfaces;
 19using Options;
 20
 21public class BlockchainMonitorService : IBlockchainMonitor
 22{
 23    private readonly BitcoinOptions _bitcoinOptions;
 24    private readonly IBitcoinWallet _bitcoinWallet;
 25    private readonly ILogger<BlockchainMonitorService> _logger;
 26    private readonly IServiceProvider _serviceProvider;
 27    private readonly Network _network;
 2828    private readonly SemaphoreSlim _newBlockSemaphore = new(1, 1);
 2829    private readonly SemaphoreSlim _blockBacklogSemaphore = new(1, 1);
 2830    private readonly ConcurrentDictionary<uint256, WatchedTransactionModel> _watchedTransactions = new();
 31#if NET9_0_OR_GREATER
 2832    private readonly OrderedDictionary<uint, Block> _blocksToProcess = new();
 33#else
 34    // TODO: Check if ordering is the same in .NET 8
 35    private readonly SortedDictionary<uint, Block> _blocksToProcess = new();
 36#endif
 37
 2838    private BlockchainState _blockchainState = new(0, Hash.Empty, DateTime.UtcNow);
 39    private CancellationTokenSource? _cts;
 40    private Task? _monitoringTask;
 41    private uint _lastProcessedBlockHeight;
 42    private SubscriberSocket? _blockSocket;
 43    // private SubscriberSocket? _transactionSocket;
 44
 45    public event EventHandler<NewBlockEventArgs>? OnNewBlockDetected;
 46    public event EventHandler<TransactionConfirmedEventArgs>? OnTransactionConfirmed;
 47
 2848    public BlockchainMonitorService(IOptions<BitcoinOptions> bitcoinOptions, IBitcoinWallet bitcoinWallet,
 2849                                    ILogger<BlockchainMonitorService> logger, IOptions<NodeOptions> nodeOptions,
 2850                                    IServiceProvider serviceProvider)
 51    {
 2852        _bitcoinOptions = bitcoinOptions.Value;
 2853        _bitcoinWallet = bitcoinWallet;
 2854        _logger = logger;
 2855        _serviceProvider = serviceProvider;
 2856        _network = Network.GetNetwork(nodeOptions.Value.BitcoinNetwork) ?? Network.Main;
 2857    }
 58
 59    public async Task StartAsync(CancellationToken cancellationToken)
 60    {
 1661        _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 62
 1663        using var scope = _serviceProvider.CreateScope();
 1664        using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
 65
 66        // Load pending transactions
 1667        await LoadPendingWatchedTransactionsAsync(uow);
 68
 69        // Get the current state or create a new one if it doesn't exist
 1670        var currentBlockchainState = await uow.BlockchainStateDbRepository.GetStateAsync();
 1671        if (currentBlockchainState is null)
 72        {
 873            var lastProcessedHeight = await _bitcoinWallet.GetCurrentBlockHeightAsync();
 874            _logger.LogInformation("No blockchain state found, starting from height {Height}", lastProcessedHeight);
 75
 876            _blockchainState = new BlockchainState(0, Hash.Empty, DateTime.UtcNow);
 877            uow.BlockchainStateDbRepository.Add(_blockchainState);
 78        }
 79        else
 80        {
 881            _blockchainState = currentBlockchainState;
 882            _lastProcessedBlockHeight = _blockchainState.LastProcessedHeight;
 883            _logger.LogInformation("Starting blockchain monitoring at height {Height}, last block hash {LastBlockHash}",
 884                                   _lastProcessedBlockHeight, _blockchainState.LastProcessedBlockHash);
 85        }
 86
 87        // Get the current block height from the wallet
 1688        var currentBlockHeight = await _bitcoinWallet.GetCurrentBlockHeightAsync();
 89
 90        // Add the current block to the processing queue
 1691        var currentBlock = await _bitcoinWallet.GetBlockAsync(_lastProcessedBlockHeight);
 1692        if (currentBlock is not null)
 1293            _blocksToProcess[_lastProcessedBlockHeight] = currentBlock;
 94
 95        // Add missing blocks to the processing queue and process any pending blocks
 1696        await AddMissingBlocksToProcessAsync(currentBlockHeight);
 1697        await ProcessPendingBlocksAsync(uow);
 98
 1699        await uow.SaveChangesAsync();
 100
 101        // Initialize ZMQ sockets
 16102        InitializeZmqSockets();
 103
 104        // Start monitoring task
 16105        _monitoringTask = MonitorBlockchainAsync(_cts.Token);
 106
 16107        _logger.LogInformation("Blockchain monitor service started successfully");
 16108    }
 109
 110    public async Task StopAsync()
 111    {
 4112        if (_cts is null)
 113        {
 0114            throw new InvalidOperationException("Service is not running");
 115        }
 116
 4117        await _cts.CancelAsync();
 118
 4119        if (_monitoringTask is not null)
 120        {
 121            try
 122            {
 4123                await _monitoringTask;
 4124            }
 0125            catch (OperationCanceledException)
 126            {
 127                // Expected during cancellation
 0128            }
 129        }
 130
 4131        CleanupZmqSockets();
 4132    }
 133
 134    public async Task WatchTransactionAsync(ChannelId channelId, TxId txId, uint requiredDepth)
 135    {
 4136        _logger.LogInformation("Watching transaction {TxId} for {RequiredDepth} confirmations for channel {channelId}",
 4137                               txId, requiredDepth, channelId);
 138
 4139        using var scope = _serviceProvider.CreateScope();
 4140        using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
 141
 4142        var nBitcoinTxId = new uint256(txId);
 4143        var watchedTx = new WatchedTransactionModel(channelId, txId, requiredDepth);
 144
 4145        uow.WatchedTransactionDbRepository.Add(watchedTx);
 146
 4147        _watchedTransactions[nBitcoinTxId] = watchedTx;
 148
 4149        await uow.SaveChangesAsync();
 4150    }
 151
 152    // public Task WatchForRevocationAsync(TxId commitmentTxId, SignedTransaction penaltyTx)
 153    // {
 154    //     _logger.LogInformation("Watching for revocation of commitment transaction {CommitmentTxId}", commitmentTxId);
 155    //
 156    //     var nBitcoinTxId = new uint256(commitmentTxId);
 157    //     var revocationWatch = new RevocationWatch(nBitcoinTxId, Transaction.Load(penaltyTx.RawTxBytes, _network));
 158    //
 159    //     _revocationWatches.TryAdd(nBitcoinTxId, revocationWatch);
 160    //     return Task.CompletedTask;
 161    // }
 162
 163    private async Task MonitorBlockchainAsync(CancellationToken cancellationToken)
 164    {
 16165        _logger.LogInformation("Starting blockchain monitoring loop");
 166
 167        try
 168        {
 74169            while (!cancellationToken.IsCancellationRequested)
 170            {
 171                try
 172                {
 173                    // Check for new blocks
 74174                    if (_blockSocket != null &&
 74175                        _blockSocket.TryReceiveFrameString(TimeSpan.FromMilliseconds(100), out var topic))
 176                    {
 0177                        if (topic == "rawblock" && _blockSocket.TryReceiveFrameBytes(out var blockHashBytes))
 178                        {
 179                            try
 180                            {
 181                                // One at a time
 0182                                await _newBlockSemaphore.WaitAsync(cancellationToken);
 0183                                var block = Block.Load(blockHashBytes, _network);
 0184                                var coinbaseHeight = block.GetCoinbaseHeight();
 0185                                if (!coinbaseHeight.HasValue)
 186                                {
 187                                    // Get the current height from the wallet
 0188                                    var currentHeight = await _bitcoinWallet.GetCurrentBlockHeightAsync();
 189
 190                                    // Get the block from the wallet
 0191                                    var blockAtHeight = await _bitcoinWallet.GetBlockAsync(currentHeight);
 0192                                    if (blockAtHeight is null)
 193                                    {
 0194                                        _logger.LogError("Failed to retrieve block at height {Height}", currentHeight);
 0195                                        return;
 196                                    }
 197
 0198                                    coinbaseHeight = (int)currentHeight;
 199                                }
 200
 0201                                await ProcessNewBlock(block, (uint)coinbaseHeight);
 0202                            }
 203                            finally
 204                            {
 0205                                _newBlockSemaphore.Release();
 206                            }
 207                        }
 0208                    }
 209
 210                    // TODO: Check for new transactions
 211                    // if (_transactionSocket != null &&
 212                    //     _transactionSocket.TryReceiveFrameString(TimeSpan.FromMilliseconds(100), out var txTopic))
 213                    // {
 214                    //     if (txTopic == "rawtx" && _transactionSocket.TryReceiveFrameBytes(out var rawTxBytes))
 215                    //     {
 216                    //         await ProcessNewTransaction(rawTxBytes);
 217                    //     }
 218                    // }
 219
 220                    // Small delay to prevent CPU spinning
 68221                    await Task.Delay(50, cancellationToken);
 58222                }
 4223                catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
 224                {
 0225                    _logger.LogError(ex, "Error in blockchain monitoring loop");
 0226                    await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
 227                }
 228            }
 0229        }
 4230        catch (OperationCanceledException)
 231        {
 4232            _logger.LogInformation("Blockchain monitoring loop cancelled");
 4233        }
 0234        catch (Exception ex)
 235        {
 0236            _logger.LogError(ex, "Fatal error in blockchain monitoring loop");
 0237        }
 4238    }
 239
 240    private void InitializeZmqSockets()
 241    {
 242        try
 243        {
 244            // Subscribe to new blocks
 16245            _blockSocket = new SubscriberSocket();
 16246            _blockSocket.Connect($"tcp://{_bitcoinOptions.ZmqHost}:{_bitcoinOptions.ZmqBlockPort}");
 16247            _blockSocket.Subscribe("rawblock");
 248
 249            // // Subscribe to new transactions (for mempool monitoring)
 250            // _transactionSocket = new SubscriberSocket();
 251            // _transactionSocket.Connect($"tcp://{_bitcoinOptions.ZmqHost}:{_bitcoinOptions.ZmqTxPort}");
 252            // _transactionSocket.Subscribe("rawtx");
 253
 16254            _logger.LogInformation("ZMQ sockets initialized - Block: {BlockPort}, Tx: {TxPort}",
 16255                                   _bitcoinOptions.ZmqBlockPort, _bitcoinOptions.ZmqTxPort);
 16256        }
 0257        catch (Exception ex)
 258        {
 0259            _logger.LogError(ex, "Failed to initialize ZMQ sockets");
 0260            CleanupZmqSockets();
 0261            throw;
 262        }
 16263    }
 264
 265    private void CleanupZmqSockets()
 266    {
 267        try
 268        {
 4269            _blockSocket?.Dispose();
 4270            _blockSocket = null;
 271
 272            // _transactionSocket?.Dispose();
 273            // _transactionSocket = null;
 274
 4275            _logger.LogDebug("ZMQ sockets cleaned up");
 4276        }
 0277        catch (Exception ex)
 278        {
 0279            _logger.LogError(ex, "Error cleaning up ZMQ sockets");
 0280        }
 4281    }
 282
 283    private async Task ProcessPendingBlocksAsync(IUnitOfWork uow)
 284    {
 285        try
 286        {
 20287            await _blockBacklogSemaphore.WaitAsync();
 288
 504289            while (_blocksToProcess.Count > 0)
 290            {
 484291                var blockKvp = _blocksToProcess.First();
 484292                if (blockKvp.Key <= _lastProcessedBlockHeight)
 12293                    _logger.LogWarning("Possible reorg detected: Block {Height} is already processed.", blockKvp.Key);
 294
 484295                ProcessBlock(blockKvp.Value, blockKvp.Key, uow);
 296            }
 20297        }
 298        finally
 299        {
 20300            _blockBacklogSemaphore.Release();
 301        }
 20302    }
 303
 304    private async Task AddMissingBlocksToProcessAsync(uint currentHeight)
 305    {
 20306        var lastProcessedHeight = _lastProcessedBlockHeight + 1;
 20307        if (currentHeight > lastProcessedHeight)
 308        {
 12309            _logger.LogWarning("Processing missed blocks from height {LastProcessedHeight} to {CurrentHeight}",
 12310                               lastProcessedHeight, currentHeight);
 311
 960312            for (var height = lastProcessedHeight; height < currentHeight; height++)
 313            {
 468314                if (_blocksToProcess.ContainsKey(height))
 315                    continue;
 316
 317                // Add missing block to process queue
 468318                var blockAtHeight = await _bitcoinWallet.GetBlockAsync(height);
 468319                if (blockAtHeight is not null)
 320                {
 468321                    _blocksToProcess[height] = blockAtHeight;
 322                }
 323                else
 324                {
 0325                    _logger.LogError("Missing block at height {Height}", height);
 326                }
 327            }
 328        }
 20329    }
 330
 331    private async Task ProcessNewBlock(Block block, uint currentHeight)
 332    {
 4333        using var scope = _serviceProvider.CreateScope();
 4334        using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
 335
 4336        var blockHash = block.GetHash();
 337
 338        try
 339        {
 4340            _logger.LogDebug("Processing block at height {blockHeight}: {BlockHash}", currentHeight, blockHash);
 341
 342            // Check for missed blocks first
 4343            await AddMissingBlocksToProcessAsync(currentHeight);
 344
 345            // Store the current block for processing
 4346            _blocksToProcess[currentHeight] = block;
 347
 348            // Process missing blocks
 4349            await ProcessPendingBlocksAsync(uow);
 4350        }
 0351        catch (Exception ex)
 352        {
 0353            _logger.LogError(ex, "Error processing new block {BlockHash}", blockHash);
 0354        }
 355
 4356        await uow.SaveChangesAsync();
 4357    }
 358
 359    // TODO: Check for revocation transactions in mempool
 360    // private async Task ProcessNewTransaction(byte[] rawTxBytes)
 361    // {
 362    //     try
 363    //     {
 364    //         var transaction = Transaction.Load(rawTxBytes, Network.Main);
 365    //     }
 366    //     catch (Exception ex)
 367    //     {
 368    //         _logger.LogError(ex, "Error processing new transaction from mempool");
 369    //     }
 370    // }
 371
 372    private void ProcessBlock(Block block, uint height, IUnitOfWork uow)
 373    {
 374        try
 375        {
 484376            var blockHash = block.GetHash();
 377
 484378            _logger.LogDebug("Processing block {Height} with {TxCount} transactions", height, block.Transactions.Count);
 379
 380            // Notify listeners of the new block
 484381            OnNewBlockDetected?.Invoke(this, new NewBlockEventArgs(height, blockHash.ToBytes()));
 382
 383            // Check if watched transactions are included in this block
 484384            CheckWatchedTransactionsForBlock(block.Transactions, height, uow);
 385
 386            // Update blockchain state
 484387            _blockchainState.UpdateState(blockHash.ToBytes(), height);
 484388            uow.BlockchainStateDbRepository.Update(_blockchainState);
 389
 484390            _blocksToProcess.Remove(height);
 391
 392            // Update our internal state
 484393            _lastProcessedBlockHeight = height;
 394
 395            // Check watched for all transactions' depth
 484396            CheckWatchedTransactionsDepth(uow);
 484397        }
 0398        catch (Exception ex)
 399        {
 0400            _logger.LogError(ex, "Error processing block at height {Height}", height);
 0401        }
 484402    }
 403
 404    private void ConfirmTransaction(uint blockHeight, IUnitOfWork uow, WatchedTransactionModel watchedTransaction)
 405    {
 4406        _logger.LogInformation(
 4407            "Transaction {TxId} reached required depth of {depth} confirmations at block {blockHeight}",
 4408            watchedTransaction.TransactionId, watchedTransaction.RequiredDepth, blockHeight);
 409
 4410        watchedTransaction.MarkAsCompleted();
 4411        uow.WatchedTransactionDbRepository.Update(watchedTransaction);
 4412        OnTransactionConfirmed?.Invoke(
 4413            this, new TransactionConfirmedEventArgs(watchedTransaction, blockHeight));
 414
 4415        _watchedTransactions.TryRemove(new uint256(watchedTransaction.TransactionId), out _);
 4416    }
 417
 418    private void CheckWatchedTransactionsForBlock(List<Transaction> blockTransactions, uint blockHeight,
 419                                                  IUnitOfWork uow)
 420    {
 488421        _logger.LogDebug(
 488422            "Checking {watchedTransactionCount} watched transactions for block {height} with {TxCount} transactions",
 488423            _watchedTransactions.Count, blockHeight, blockTransactions.Count);
 424
 488425        ushort index = 0;
 984426        foreach (var transaction in blockTransactions)
 427        {
 4428            var txId = transaction.GetHash();
 429
 4430            if (!_watchedTransactions.TryGetValue(txId, out var watchedTransaction))
 431                continue;
 432
 4433            _logger.LogInformation("Transaction {TxId} found in block at height {Height}", txId, blockHeight);
 434
 435            try
 436            {
 437                // Update first seen height
 4438                watchedTransaction.SetHeightAndIndex(blockHeight, index);
 4439                uow.WatchedTransactionDbRepository.Update(watchedTransaction);
 440
 4441                if (watchedTransaction.RequiredDepth == 0)
 0442                    ConfirmTransaction(blockHeight, uow, watchedTransaction);
 4443            }
 0444            catch (Exception ex)
 445            {
 0446                _logger.LogError(ex, "Error checking confirmations for transaction {TxId}", txId);
 0447            }
 448            finally
 449            {
 4450                index++;
 4451            }
 452        }
 488453    }
 454
 455    private void CheckWatchedTransactionsDepth(IUnitOfWork uow)
 456    {
 1064457        foreach (var (txId, watchedTransaction) in _watchedTransactions)
 458        {
 459            try
 460            {
 44461                var confirmations = _lastProcessedBlockHeight - watchedTransaction.FirstSeenAtHeight;
 44462                if (confirmations >= watchedTransaction.RequiredDepth)
 4463                    ConfirmTransaction(_lastProcessedBlockHeight, uow, watchedTransaction);
 44464            }
 0465            catch (Exception ex)
 466            {
 0467                _logger.LogError(ex, "Error checking confirmations for transaction {TxId}", txId);
 0468            }
 469        }
 488470    }
 471
 472    private async Task LoadPendingWatchedTransactionsAsync(IUnitOfWork uow)
 473    {
 16474        _logger.LogInformation("Loading watched transactions from database");
 475
 16476        var watchedTransactions = await uow.WatchedTransactionDbRepository.GetAllPendingAsync();
 40477        foreach (var watchedTransaction in watchedTransactions)
 478        {
 4479            _watchedTransactions[new uint256(watchedTransaction.TransactionId)] = watchedTransaction;
 480        }
 16481    }
 482}