Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions clightning/clightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,16 @@ import (
"os"
"time"

"github.com/elementsproject/peerswap/log"
"github.com/elementsproject/peerswap/premium"

"github.com/btcsuite/btcd/chaincfg"
"github.com/elementsproject/glightning/gbitcoin"
"github.com/elementsproject/peerswap/onchain"

"github.com/elementsproject/glightning/glightning"
"github.com/elementsproject/glightning/jrpc2"
"github.com/elementsproject/peerswap/lightning"
"github.com/elementsproject/peerswap/log"
"github.com/elementsproject/peerswap/messages"
"github.com/elementsproject/peerswap/poll"
"github.com/elementsproject/peerswap/onchain"
"github.com/elementsproject/peerswap/peersync"
"github.com/elementsproject/peerswap/premium"
"github.com/elementsproject/peerswap/swap"
"github.com/elementsproject/peerswap/wallet"
goerrors "github.com/go-errors/errors"
Expand Down Expand Up @@ -75,7 +73,8 @@ type ClightningClient struct {
swaps *swap.SwapService
requestedSwaps *swap.RequestedSwapsPrinter
policy PolicyReloader
pollService *poll.Service
peerSync *peersync.PeerSync
peerStore *peersync.Store
ps *premium.Setting

gbitcoin *gbitcoin.Bitcoin
Expand Down Expand Up @@ -326,14 +325,15 @@ func (cl *ClightningClient) GetPreimage() (lightning.Preimage, error) {
func (cl *ClightningClient) SetupClients(liquidWallet wallet.Wallet,
swaps *swap.SwapService,
policy PolicyReloader, requestedSwaps *swap.RequestedSwapsPrinter,
bitcoin *gbitcoin.Bitcoin, bitcoinChain *onchain.BitcoinOnChain, pollService *poll.Service,
bitcoin *gbitcoin.Bitcoin, bitcoinChain *onchain.BitcoinOnChain, peerSync *peersync.PeerSync, peerStore *peersync.Store,
ps *premium.Setting) {
cl.liquidWallet = liquidWallet
cl.requestedSwaps = requestedSwaps
cl.swaps = swaps
cl.policy = policy
cl.gbitcoin = bitcoin
cl.pollService = pollService
cl.peerSync = peerSync
cl.peerStore = peerStore
cl.bitcoinChain = bitcoinChain
cl.ps = ps
if cl.bitcoinChain != nil {
Expand Down Expand Up @@ -519,16 +519,6 @@ func (cl *ClightningClient) isPeerConnected(nodeId string) bool {
return peer.Connected
}

// peerRunsPeerSwap returns true if the peer with peerId is listed in the
// pollService.
func (cl *ClightningClient) peerRunsPeerSwap(peerId string) bool {
pollInfo, err := cl.pollService.GetPollFrom(peerId)
if err == nil && pollInfo != nil {
return true
}
return false
}

// This is called after the Plugin starts up successfully
func (cl *ClightningClient) onInit(plugin *glightning.Plugin, options map[string]glightning.Option, config *glightning.Config) {
cl.glightning.StartUp(config.RpcFile, config.LightningDir)
Expand All @@ -549,10 +539,20 @@ func (cl *ClightningClient) OnConnect(connectEvent *glightning.ConnectEvent) {
go func() {
for {
time.Sleep(10 * time.Second)
if cl.pollService != nil {
cl.pollService.RequestPoll(
lo.Ternary(connectEvent.PeerId != "",
connectEvent.PeerId, connectEvent.Conn.PeerId))
if cl.peerSync != nil {
peerIDValue := lo.Ternary(connectEvent.PeerId != "",
connectEvent.PeerId, connectEvent.Conn.PeerId)
if peerIDValue == "" {
return
}
peerID, err := peersync.NewPeerID(peerIDValue)
if err != nil {
log.Debugf("invalid peer id for request poll: %v", err)
return
}
if err := cl.peerSync.RequestPoll(cl.ctx, peerID); err != nil {
log.Debugf("failed to request poll for %s: %v", peerIDValue, err)
}
return
}
}
Expand Down
108 changes: 26 additions & 82 deletions clightning/clightning_commands.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clightning

import (
"context"
"crypto/rand"
"encoding/hex"
"errors"
Expand All @@ -12,6 +13,8 @@ import (

"github.com/elementsproject/peerswap/log"
"github.com/elementsproject/peerswap/peerswaprpc"
"github.com/elementsproject/peerswap/peersync"
"github.com/elementsproject/peerswap/peersync/format"
"github.com/elementsproject/peerswap/premium"

"github.com/elementsproject/glightning/glightning"
Expand Down Expand Up @@ -225,8 +228,10 @@ func (l *SwapOut) Call() (jrpc2.Result, error) {
}

// Skip this check when `force` is set.
if !l.Force && !l.cl.peerRunsPeerSwap(fundingChannels.Id) {
return nil, fmt.Errorf("peer does not run peerswap")
if !l.Force {
if l.cl.peerSync == nil || !l.cl.peerSync.HasCompatiblePeer(fundingChannels.Id) {
return nil, fmt.Errorf("peer does not run peerswap")
}
}

if !l.cl.isPeerConnected(fundingChannels.Id) {
Expand Down Expand Up @@ -345,8 +350,10 @@ func (l *SwapIn) Call() (jrpc2.Result, error) {
}

// Skip this check when `force` is set.
if !l.Force && !l.cl.peerRunsPeerSwap(fundingChannels.Id) {
return nil, fmt.Errorf("peer does not run peerswap")
if !l.Force {
if l.cl.peerSync == nil || !l.cl.peerSync.HasCompatiblePeer(fundingChannels.Id) {
return nil, fmt.Errorf("peer does not run peerswap")
}
}

if !l.cl.isPeerConnected(fundingChannels.Id) {
Expand Down Expand Up @@ -559,91 +566,26 @@ func (l *ListPeers) Call() (jrpc2.Result, error) {
fundingChannels[channel.ShortChannelId] = channel
}

// get polls
polls, err := l.cl.pollService.GetCompatiblePolls()
if err != nil {
return nil, err
compatible := make(map[string]*peersync.Peer)
if l.cl.peerSync != nil {
compatible, err = l.cl.peerSync.CompatiblePeers()
if err != nil {
return nil, err
}
}

peerSwappers := []*peerswaprpc.PeerSwapPeer{}
for _, peer := range peers {
if p, ok := polls[peer.Id]; ok {
peerState, ok := compatible[peer.Id]
if ok {
capability := peerState.Capability()
swaps, err := l.cl.swaps.ListSwapsByPeer(peer.Id)
if err != nil {
return nil, err
}

var paidFees uint64
var ReceiverSwapsOut, ReceiverSwapsIn, ReceiverSatsOut, ReceiverSatsIn uint64
var SenderSwapsOut, SenderSwapsIn, SenderSatsOut, SenderSatsIn uint64
for _, s := range swaps {
// We only list successful swaps. They all end in an
// State_ClaimedPreimage state.
if s.Current == swap.State_ClaimedPreimage {
if s.Role == swap.SWAPROLE_SENDER {
paidFees += s.Data.OpeningTxFee
if s.Type == swap.SWAPTYPE_OUT {
SenderSwapsOut++
SenderSatsOut += s.Data.GetAmount()
} else {
SenderSwapsIn++
SenderSatsIn += s.Data.GetAmount()
}
} else {
if s.Type == swap.SWAPTYPE_OUT {
ReceiverSwapsOut++
ReceiverSatsOut += s.Data.GetAmount()
} else {
ReceiverSwapsIn++
ReceiverSatsIn += s.Data.GetAmount()
}
}
}
}

peerSwapPeer := &peerswaprpc.PeerSwapPeer{
NodeId: peer.Id,
SwapsAllowed: p.PeerAllowed,
SupportedAssets: p.Assets,
AsSender: &peerswaprpc.SwapStats{
SwapsOut: SenderSwapsOut,
SwapsIn: SenderSwapsIn,
SatsOut: SenderSatsOut,
SatsIn: SenderSatsIn,
},
AsReceiver: &peerswaprpc.SwapStats{
SwapsOut: ReceiverSwapsOut,
SwapsIn: ReceiverSwapsIn,
SatsOut: ReceiverSatsOut,
SatsIn: ReceiverSatsIn,
},
PaidFee: paidFees,
PeerPremium: &peerswaprpc.PeerPremium{
NodeId: peer.Id,
Rates: []*peerswaprpc.PremiumRate{
{
Asset: peerswaprpc.AssetType_BTC,
Operation: peerswaprpc.OperationType_SWAP_IN,
PremiumRatePpm: p.BTCSwapInPremiumRatePPM,
},
{
Asset: peerswaprpc.AssetType_BTC,
Operation: peerswaprpc.OperationType_SWAP_OUT,
PremiumRatePpm: p.BTCSwapOutPremiumRatePPM,
},
{
Asset: peerswaprpc.AssetType_LBTC,
Operation: peerswaprpc.OperationType_SWAP_IN,
PremiumRatePpm: p.LBTCSwapInPremiumRatePPM,
},
{
Asset: peerswaprpc.AssetType_LBTC,
Operation: peerswaprpc.OperationType_SWAP_OUT,
PremiumRatePpm: p.LBTCSwapOutPremiumRatePPM,
},
},
},
}
view := format.BuildPeerView(peer.Id, capability, swaps)
peerSwapPeer := peerswaprpc.NewPeerSwapPeerFromView(view)
channels, err := l.cl.glightning.ListChannelsBySource(peer.Id)
if err != nil {
return nil, err
Expand Down Expand Up @@ -754,8 +696,10 @@ func (c ReloadPolicyFile) Call() (jrpc2.Result, error) {
if err != nil {
return nil, err
}
// Resend poll
c.cl.pollService.PollAllPeers()
// Resend capability updates
if c.cl.peerSync != nil {
c.cl.peerSync.ForcePollAllPeers(context.Background())
}
return c.cl.policy.Get(), nil
}

Expand Down
51 changes: 45 additions & 6 deletions cmd/peerswap-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/elementsproject/peerswap/log"
"github.com/elementsproject/peerswap/lwk"
"github.com/elementsproject/peerswap/version"
"github.com/thejerf/suture/v4"
"golang.org/x/sys/unix"

"github.com/vulpemventures/go-elements/network"
Expand All @@ -26,8 +27,8 @@ import (
"github.com/elementsproject/peerswap/clightning"
"github.com/elementsproject/peerswap/messages"
"github.com/elementsproject/peerswap/onchain"
"github.com/elementsproject/peerswap/peersync"
"github.com/elementsproject/peerswap/policy"
"github.com/elementsproject/peerswap/poll"
"github.com/elementsproject/peerswap/premium"
"github.com/elementsproject/peerswap/swap"
"github.com/elementsproject/peerswap/txwatcher"
Expand Down Expand Up @@ -372,16 +373,54 @@ func run(ctx context.Context, lightningPlugin *clightning.ClightningClient) erro
return err
}

pollStore, err := poll.NewStore(swapDb)
peerSyncDBPath := filepath.Join(config.PeerswapDir, "peersync.db")
peerStore, err := peersync.NewStore(peerSyncDBPath)
if err != nil {
return err
}
pollService := poll.NewService(1*time.Hour, 2*time.Hour, pollStore, lightningPlugin, pol, lightningPlugin, supportedAssets, ps)
pollService.Start()
defer pollService.Stop()
defer func() {
if closeErr := peerStore.Close(); closeErr != nil {
log.Infof("peersync store close failed: %v", closeErr)
}
}()

nodeID, err := peersync.NewPeerID(lightningPlugin.GetNodeId())
if err != nil {
return err
}

// Use the peersync CLN lightning adapter; it self-registers message handler
clnAdapter := peersync.NewClnLightningAdapter(lightningPlugin)
peerSync := peersync.NewPeerSync(nodeID, peerStore, clnAdapter, pol, supportedAssets, ps)

psSupervisor := suture.New("peersync", suture.Spec{
EventHook: func(e suture.Event) {
log.Infof("peersync supervisor event: %s", e)
},
})
psSupervisor.Add(peerSync)

peerSyncCtx, peerSyncCancel := context.WithCancel(ctx)
peerSyncErrCh := psSupervisor.ServeBackground(peerSyncCtx)
defer func() {
peerSyncCancel()
if err := <-peerSyncErrCh; err != nil && !errors.Is(err, context.Canceled) {
log.Infof("peersync supervisor exited: %v", err)
}
}()

sp := swap.NewRequestedSwapsPrinter(requestedSwapStore)
lightningPlugin.SetupClients(liquidRpcWallet, swapService, pol, sp, bitcoinCli, bitcoinOnChainService, pollService, ps)
lightningPlugin.SetupClients(
liquidRpcWallet,
swapService,
pol,
sp,
bitcoinCli,
bitcoinOnChainService,
peerSync,
peerStore,
ps,
)

// We are ready to accept and handle requests.
// FIXME: Once we reworked the recovery service (non-blocking) we want to
Expand Down
Loading
Loading