Skip to content

Commit 1ce0a2c

Browse files
committed
Implement peer synchronization and management features
- Added a new Store implementation for managing peer states with tests. - Introduced a Supervisor to manage the lifecycle of the peer sync loop, including restart logic and error handling. - Created tests for the Supervisor to ensure it respects maximum restart limits and stops on cancellation. - Removed outdated PollMessage and related service code, consolidating functionality into the new Store and Supervisor. - Updated tests to reflect the removal of the old Poll service and ensure new functionality is covered.
1 parent 569b7be commit 1ce0a2c

27 files changed

+3462
-702
lines changed

clightning/clightning.go

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,16 @@ import (
1010
"os"
1111
"time"
1212

13-
"github.com/elementsproject/peerswap/log"
14-
"github.com/elementsproject/peerswap/premium"
15-
1613
"github.com/btcsuite/btcd/chaincfg"
1714
"github.com/elementsproject/glightning/gbitcoin"
18-
"github.com/elementsproject/peerswap/onchain"
19-
2015
"github.com/elementsproject/glightning/glightning"
2116
"github.com/elementsproject/glightning/jrpc2"
2217
"github.com/elementsproject/peerswap/lightning"
18+
"github.com/elementsproject/peerswap/log"
2319
"github.com/elementsproject/peerswap/messages"
24-
"github.com/elementsproject/peerswap/poll"
20+
"github.com/elementsproject/peerswap/onchain"
21+
"github.com/elementsproject/peerswap/peersync"
22+
"github.com/elementsproject/peerswap/premium"
2523
"github.com/elementsproject/peerswap/swap"
2624
"github.com/elementsproject/peerswap/wallet"
2725
goerrors "github.com/go-errors/errors"
@@ -75,7 +73,8 @@ type ClightningClient struct {
7573
swaps *swap.SwapService
7674
requestedSwaps *swap.RequestedSwapsPrinter
7775
policy PolicyReloader
78-
pollService *poll.Service
76+
peerSync *peersync.PeerSync
77+
peerStore *peersync.Store
7978
ps *premium.Setting
8079

8180
gbitcoin *gbitcoin.Bitcoin
@@ -326,14 +325,15 @@ func (cl *ClightningClient) GetPreimage() (lightning.Preimage, error) {
326325
func (cl *ClightningClient) SetupClients(liquidWallet wallet.Wallet,
327326
swaps *swap.SwapService,
328327
policy PolicyReloader, requestedSwaps *swap.RequestedSwapsPrinter,
329-
bitcoin *gbitcoin.Bitcoin, bitcoinChain *onchain.BitcoinOnChain, pollService *poll.Service,
328+
bitcoin *gbitcoin.Bitcoin, bitcoinChain *onchain.BitcoinOnChain, peerSync *peersync.PeerSync, peerStore *peersync.Store,
330329
ps *premium.Setting) {
331330
cl.liquidWallet = liquidWallet
332331
cl.requestedSwaps = requestedSwaps
333332
cl.swaps = swaps
334333
cl.policy = policy
335334
cl.gbitcoin = bitcoin
336-
cl.pollService = pollService
335+
cl.peerSync = peerSync
336+
cl.peerStore = peerStore
337337
cl.bitcoinChain = bitcoinChain
338338
cl.ps = ps
339339
if cl.bitcoinChain != nil {
@@ -520,13 +520,23 @@ func (cl *ClightningClient) isPeerConnected(nodeId string) bool {
520520
}
521521

522522
// peerRunsPeerSwap returns true if the peer with peerId is listed in the
523-
// pollService.
523+
// peerRunsPeerSwap returns true if the peersync store has a compatible capability for the peer.
524524
func (cl *ClightningClient) peerRunsPeerSwap(peerId string) bool {
525-
pollInfo, err := cl.pollService.GetPollFrom(peerId)
526-
if err == nil && pollInfo != nil {
527-
return true
525+
if cl.peerStore == nil {
526+
return false
527+
}
528+
id, err := peersync.NewPeerID(peerId)
529+
if err != nil {
530+
return false
531+
}
532+
peer, err := cl.peerStore.GetPeerState(id)
533+
if err != nil || peer == nil {
534+
return false
535+
}
536+
if peer.Capability() == nil {
537+
return false
528538
}
529-
return false
539+
return peer.IsCompatibleWith(peersync.NewVersion(swap.PEERSWAP_PROTOCOL_VERSION))
530540
}
531541

532542
// This is called after the Plugin starts up successfully
@@ -549,10 +559,20 @@ func (cl *ClightningClient) OnConnect(connectEvent *glightning.ConnectEvent) {
549559
go func() {
550560
for {
551561
time.Sleep(10 * time.Second)
552-
if cl.pollService != nil {
553-
cl.pollService.RequestPoll(
554-
lo.Ternary(connectEvent.PeerId != "",
555-
connectEvent.PeerId, connectEvent.Conn.PeerId))
562+
if cl.peerSync != nil {
563+
peerIDValue := lo.Ternary(connectEvent.PeerId != "",
564+
connectEvent.PeerId, connectEvent.Conn.PeerId)
565+
if peerIDValue == "" {
566+
return
567+
}
568+
peerID, err := peersync.NewPeerID(peerIDValue)
569+
if err != nil {
570+
log.Debugf("invalid peer id for request poll: %v", err)
571+
return
572+
}
573+
if err := cl.peerSync.RequestPoll(cl.ctx, peerID); err != nil {
574+
log.Debugf("failed to request poll for %s: %v", peerIDValue, err)
575+
}
556576
return
557577
}
558578
}

clightning/clightning_commands.go

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package clightning
22

33
import (
4+
"context"
45
"crypto/rand"
56
"encoding/hex"
67
"errors"
@@ -12,6 +13,7 @@ import (
1213

1314
"github.com/elementsproject/peerswap/log"
1415
"github.com/elementsproject/peerswap/peerswaprpc"
16+
"github.com/elementsproject/peerswap/peersync"
1517
"github.com/elementsproject/peerswap/premium"
1618

1719
"github.com/elementsproject/glightning/glightning"
@@ -559,15 +561,28 @@ func (l *ListPeers) Call() (jrpc2.Result, error) {
559561
fundingChannels[channel.ShortChannelId] = channel
560562
}
561563

562-
// get polls
563-
polls, err := l.cl.pollService.GetCompatiblePolls()
564+
peerStates, err := l.cl.peerStore.GetAllPeerStates()
564565
if err != nil {
565566
return nil, err
566567
}
567568

569+
compatible := make(map[string]*peersync.Peer)
570+
targetVersion := peersync.NewVersion(swap.PEERSWAP_PROTOCOL_VERSION)
571+
for _, state := range peerStates {
572+
if state == nil || state.Capability() == nil {
573+
continue
574+
}
575+
if !state.IsCompatibleWith(targetVersion) {
576+
continue
577+
}
578+
compatible[state.ID().String()] = state
579+
}
580+
568581
peerSwappers := []*peerswaprpc.PeerSwapPeer{}
569582
for _, peer := range peers {
570-
if p, ok := polls[peer.Id]; ok {
583+
peerState, ok := compatible[peer.Id]
584+
if ok {
585+
capability := peerState.Capability()
571586
swaps, err := l.cl.swaps.ListSwapsByPeer(peer.Id)
572587
if err != nil {
573588
return nil, err
@@ -603,8 +618,8 @@ func (l *ListPeers) Call() (jrpc2.Result, error) {
603618

604619
peerSwapPeer := &peerswaprpc.PeerSwapPeer{
605620
NodeId: peer.Id,
606-
SwapsAllowed: p.PeerAllowed,
607-
SupportedAssets: p.Assets,
621+
SwapsAllowed: capability.IsAllowed(),
622+
SupportedAssets: assetsFromCapability(capability),
608623
AsSender: &peerswaprpc.SwapStats{
609624
SwapsOut: SenderSwapsOut,
610625
SwapsIn: SenderSwapsIn,
@@ -624,22 +639,22 @@ func (l *ListPeers) Call() (jrpc2.Result, error) {
624639
{
625640
Asset: peerswaprpc.AssetType_BTC,
626641
Operation: peerswaprpc.OperationType_SWAP_IN,
627-
PremiumRatePpm: p.BTCSwapInPremiumRatePPM,
642+
PremiumRatePpm: ppmFromCapability(capability, premium.BTC, premium.SwapIn),
628643
},
629644
{
630645
Asset: peerswaprpc.AssetType_BTC,
631646
Operation: peerswaprpc.OperationType_SWAP_OUT,
632-
PremiumRatePpm: p.BTCSwapOutPremiumRatePPM,
647+
PremiumRatePpm: ppmFromCapability(capability, premium.BTC, premium.SwapOut),
633648
},
634649
{
635650
Asset: peerswaprpc.AssetType_LBTC,
636651
Operation: peerswaprpc.OperationType_SWAP_IN,
637-
PremiumRatePpm: p.LBTCSwapInPremiumRatePPM,
652+
PremiumRatePpm: ppmFromCapability(capability, premium.LBTC, premium.SwapIn),
638653
},
639654
{
640655
Asset: peerswaprpc.AssetType_LBTC,
641656
Operation: peerswaprpc.OperationType_SWAP_OUT,
642-
PremiumRatePpm: p.LBTCSwapOutPremiumRatePPM,
657+
PremiumRatePpm: ppmFromCapability(capability, premium.LBTC, premium.SwapOut),
643658
},
644659
},
645660
},
@@ -672,6 +687,27 @@ func (l *ListPeers) Call() (jrpc2.Result, error) {
672687
return peerSwappers, nil
673688
}
674689

690+
func assetsFromCapability(capability *peersync.PeerCapability) []string {
691+
assets := capability.SupportedAssets()
692+
res := make([]string, 0, len(assets))
693+
for _, asset := range assets {
694+
res = append(res, asset.String())
695+
}
696+
return res
697+
}
698+
699+
func ppmFromCapability(
700+
capability *peersync.PeerCapability,
701+
asset premium.AssetType,
702+
operation premium.OperationType,
703+
) int64 {
704+
rate := capability.GetPremiumRate(asset, operation)
705+
if rate == nil {
706+
return 0
707+
}
708+
return rate.Value()
709+
}
710+
675711
func channelActive(state string) bool {
676712
return state == "CHANNELD_NORMAL"
677713
}
@@ -754,8 +790,10 @@ func (c ReloadPolicyFile) Call() (jrpc2.Result, error) {
754790
if err != nil {
755791
return nil, err
756792
}
757-
// Resend poll
758-
c.cl.pollService.PollAllPeers()
793+
// Resend capability updates
794+
if c.cl.peerSync != nil {
795+
c.cl.peerSync.ForcePollAllPeers(context.Background())
796+
}
759797
return c.cl.policy.Get(), nil
760798
}
761799

cmd/peerswap-plugin/main.go

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import (
2626
"github.com/elementsproject/peerswap/clightning"
2727
"github.com/elementsproject/peerswap/messages"
2828
"github.com/elementsproject/peerswap/onchain"
29+
"github.com/elementsproject/peerswap/peersync"
2930
"github.com/elementsproject/peerswap/policy"
30-
"github.com/elementsproject/peerswap/poll"
3131
"github.com/elementsproject/peerswap/premium"
3232
"github.com/elementsproject/peerswap/swap"
3333
"github.com/elementsproject/peerswap/txwatcher"
@@ -372,16 +372,47 @@ func run(ctx context.Context, lightningPlugin *clightning.ClightningClient) erro
372372
return err
373373
}
374374

375-
pollStore, err := poll.NewStore(swapDb)
375+
peerSyncDBPath := filepath.Join(config.PeerswapDir, "peersync.db")
376+
peerStore, err := peersync.NewStore(peerSyncDBPath)
376377
if err != nil {
377378
return err
378379
}
379-
pollService := poll.NewService(1*time.Hour, 2*time.Hour, pollStore, lightningPlugin, pol, lightningPlugin, supportedAssets, ps)
380-
pollService.Start()
381-
defer pollService.Stop()
380+
defer func() {
381+
if closeErr := peerStore.Close(); closeErr != nil {
382+
log.Infof("peersync store close failed: %v", closeErr)
383+
}
384+
}()
385+
386+
nodeID, err := peersync.NewPeerID(lightningPlugin.GetNodeId())
387+
if err != nil {
388+
return err
389+
}
390+
391+
// Use the peersync CLN lightning adapter; it self-registers message handler
392+
clnAdapter := peersync.NewClnLightningAdapter(lightningPlugin)
393+
peerSync := peersync.NewPeerSync(nodeID, peerStore, clnAdapter, pol, supportedAssets, ps)
394+
395+
if err := peerSync.Start(ctx); err != nil {
396+
return err
397+
}
398+
defer func() {
399+
if stopErr := peerSync.Stop(); stopErr != nil {
400+
log.Infof("peersync stop failed: %v", stopErr)
401+
}
402+
}()
382403

383404
sp := swap.NewRequestedSwapsPrinter(requestedSwapStore)
384-
lightningPlugin.SetupClients(liquidRpcWallet, swapService, pol, sp, bitcoinCli, bitcoinOnChainService, pollService, ps)
405+
lightningPlugin.SetupClients(
406+
liquidRpcWallet,
407+
swapService,
408+
pol,
409+
sp,
410+
bitcoinCli,
411+
bitcoinOnChainService,
412+
peerSync,
413+
peerStore,
414+
ps,
415+
)
385416

386417
// We are ready to accept and handle requests.
387418
// FIXME: Once we reworked the recovery service (non-blocking) we want to

0 commit comments

Comments
 (0)