Skip to content

Commit 7762b4d

Browse files
authored
reduce the number of STM transactions (#1255)
* reduce the number of STM transactions * reduce STM more * refactor * remove more
1 parent 7d84572 commit 7762b4d

File tree

34 files changed

+495
-474
lines changed

34 files changed

+495
-474
lines changed

src/Simplex/FileTransfer/Agent.hs

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
184184
cfg <- asks config
185185
forever $ do
186186
lift $ waitForWork doWork
187-
atomically $ assertAgentForeground c
187+
liftIO $ assertAgentForeground c
188188
runXFTPOperation cfg
189189
where
190190
runXFTPOperation :: AgentConfig -> AM ()
@@ -194,7 +194,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
194194
(fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _}, approvedRelays) -> do
195195
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
196196
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
197-
atomically $ waitWhileSuspended c
197+
liftIO $ waitWhileSuspended c
198198
liftIO $ waitForUserNetwork c
199199
atomically $ incXFTPServerStat c userId srv downloadAttempts
200200
downloadFileChunk fc replica approvedRelays
@@ -205,7 +205,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
205205
when (serverHostError e) $ notify c rcvFileEntityId $ RFWARN e
206206
liftIO $ closeXFTPServerClient c userId server digest
207207
withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay
208-
atomically $ assertAgentForeground c
208+
liftIO $ assertAgentForeground c
209209
loop
210210
retryDone e = do
211211
atomically . incXFTPServerStat c userId srv $ case e of
@@ -221,7 +221,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
221221
chunkSpec = XFTPRcvChunkSpec chunkPath chSize (unFileDigest digest)
222222
relChunkPath = fileTmpPath </> takeFileName chunkPath
223223
agentXFTPDownloadChunk c userId digest replica chunkSpec
224-
atomically $ waitUntilForeground c
224+
liftIO $ waitUntilForeground c
225225
(entityId, complete, progress) <- withStore c $ \db -> runExceptT $ do
226226
liftIO $ updateRcvFileChunkReceived db (rcvChunkReplicaId replica) rcvChunkId relChunkPath
227227
RcvFile {size = FileSize currentSize, chunks, redirect} <- ExceptT $ getRcvFile db rcvFileId
@@ -240,7 +240,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
240240
where
241241
ipAddressProtected' :: AM Bool
242242
ipAddressProtected' = do
243-
cfg <- liftIO $ getNetworkConfig' c
243+
cfg <- liftIO $ getFastNetworkConfig c
244244
pure $ ipAddressProtected cfg srv
245245
receivedSize :: [RcvFileChunk] -> Int64
246246
receivedSize = foldl' (\sz ch -> sz + receivedChunkSize ch) 0
@@ -273,7 +273,7 @@ runXFTPRcvLocalWorker c Worker {doWork} = do
273273
cfg <- asks config
274274
forever $ do
275275
lift $ waitForWork doWork
276-
atomically $ assertAgentForeground c
276+
liftIO $ assertAgentForeground c
277277
runXFTPOperation cfg
278278
where
279279
runXFTPOperation :: AgentConfig -> AM ()
@@ -299,12 +299,12 @@ runXFTPRcvLocalWorker c Worker {doWork} = do
299299
Nothing -> do
300300
notify c rcvFileEntityId $ RFDONE fsSavePath
301301
lift $ forM_ tmpPath (removePath <=< toFSFilePath)
302-
atomically $ waitUntilForeground c
302+
liftIO $ waitUntilForeground c
303303
withStore' c (`updateRcvFileComplete` rcvFileId)
304304
Just RcvFileRedirect {redirectFileInfo, redirectDbId} -> do
305305
let RedirectFileInfo {size = redirectSize, digest = redirectDigest} = redirectFileInfo
306306
lift $ forM_ tmpPath (removePath <=< toFSFilePath)
307-
atomically $ waitUntilForeground c
307+
liftIO $ waitUntilForeground c
308308
withStore' c (`updateRcvFileComplete` rcvFileId)
309309
-- proceed with redirect
310310
yaml <- liftError (FILE . FILE_IO . show) (CF.readFile $ CryptoFile fsSavePath cfArgs) `agentFinally` (lift $ toFSFilePath fsSavePath >>= removePath)
@@ -392,7 +392,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
392392
cfg <- asks config
393393
forever $ do
394394
lift $ waitForWork doWork
395-
atomically $ assertAgentForeground c
395+
liftIO $ assertAgentForeground c
396396
runXFTPOperation cfg
397397
where
398398
runXFTPOperation :: AgentConfig -> AM ()
@@ -454,17 +454,17 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
454454
SndFileChunkReplica {server} : _ -> Right server
455455
createChunk :: Int -> SndFileChunk -> AM (ProtocolServer 'PXFTP)
456456
createChunk numRecipients' ch = do
457-
atomically $ assertAgentForeground c
457+
liftIO $ assertAgentForeground c
458458
(replica, ProtoServerWithAuth srv _) <- tryCreate
459459
withStore' c $ \db -> createSndFileReplica db ch replica
460460
pure srv
461461
where
462462
tryCreate = do
463463
usedSrvs <- newTVarIO ([] :: [XFTPServer])
464464
let AgentClient {xftpServers} = c
465-
userSrvCount <- length <$> atomically (TM.lookup userId xftpServers)
465+
userSrvCount <- liftIO $ length <$> TM.lookupIO userId xftpServers
466466
withRetryIntervalCount (riFast ri) $ \n _ loop -> do
467-
atomically $ waitWhileSuspended c
467+
liftIO $ waitWhileSuspended c
468468
liftIO $ waitForUserNetwork c
469469
let triedAllSrvs = n > userSrvCount
470470
createWithNextSrv usedSrvs
@@ -474,7 +474,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
474474
retryLoop loop triedAllSrvs e = do
475475
flip catchAgentError (\_ -> pure ()) $ do
476476
when (triedAllSrvs && serverHostError e) $ notify c sndFileEntityId $ SFWARN e
477-
atomically $ assertAgentForeground c
477+
liftIO $ assertAgentForeground c
478478
loop
479479
createWithNextSrv usedSrvs = do
480480
deleted <- withStore' c $ \db -> getSndFileDeleted db sndFileId
@@ -494,7 +494,7 @@ runXFTPSndWorker c srv Worker {doWork} = do
494494
cfg <- asks config
495495
forever $ do
496496
lift $ waitForWork doWork
497-
atomically $ assertAgentForeground c
497+
liftIO $ assertAgentForeground c
498498
runXFTPOperation cfg
499499
where
500500
runXFTPOperation :: AgentConfig -> AM ()
@@ -504,7 +504,7 @@ runXFTPSndWorker c srv Worker {doWork} = do
504504
fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do
505505
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
506506
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
507-
atomically $ waitWhileSuspended c
507+
liftIO $ waitWhileSuspended c
508508
liftIO $ waitForUserNetwork c
509509
atomically $ incXFTPServerStat c userId srv uploadAttempts
510510
uploadFileChunk cfg fc replica
@@ -515,7 +515,7 @@ runXFTPSndWorker c srv Worker {doWork} = do
515515
when (serverHostError e) $ notify c sndFileEntityId $ SFWARN e
516516
liftIO $ closeXFTPServerClient c userId server digest
517517
withStore' c $ \db -> updateSndChunkReplicaDelay db sndChunkReplicaId replicaDelay
518-
atomically $ assertAgentForeground c
518+
liftIO $ assertAgentForeground c
519519
loop
520520
retryDone e = do
521521
atomically $ incXFTPServerStat c userId srv uploadErrs
@@ -526,9 +526,9 @@ runXFTPSndWorker c srv Worker {doWork} = do
526526
fsFilePath <- lift $ toFSFilePath filePath
527527
unlessM (doesFileExist fsFilePath) $ throwE $ FILE NO_FILE
528528
let chunkSpec' = chunkSpec {filePath = fsFilePath} :: XFTPChunkSpec
529-
atomically $ assertAgentForeground c
529+
liftIO $ assertAgentForeground c
530530
agentXFTPUploadChunk c userId chunkDigest replica' chunkSpec'
531-
atomically $ waitUntilForeground c
531+
liftIO $ waitUntilForeground c
532532
sf@SndFile {sndFileEntityId, prefixPath, chunks} <- withStore c $ \db -> do
533533
updateSndChunkReplicaStatus db sndChunkReplicaId SFRSUploaded
534534
getSndFile db sndFileId
@@ -666,7 +666,7 @@ runXFTPDelWorker c srv Worker {doWork} = do
666666
cfg <- asks config
667667
forever $ do
668668
lift $ waitForWork doWork
669-
atomically $ assertAgentForeground c
669+
liftIO $ assertAgentForeground c
670670
runXFTPOperation cfg
671671
where
672672
runXFTPOperation :: AgentConfig -> AM ()
@@ -677,7 +677,7 @@ runXFTPDelWorker c srv Worker {doWork} = do
677677
processDeletedReplica replica@DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, chunkDigest, delay} = do
678678
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
679679
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
680-
atomically $ waitWhileSuspended c
680+
liftIO $ waitWhileSuspended c
681681
liftIO $ waitForUserNetwork c
682682
atomically $ incXFTPServerStat c userId srv deleteAttempts
683683
deleteChunkReplica
@@ -688,7 +688,7 @@ runXFTPDelWorker c srv Worker {doWork} = do
688688
when (serverHostError e) $ notify c "" $ SFWARN e
689689
liftIO $ closeXFTPServerClient c userId server chunkDigest
690690
withStore' c $ \db -> updateDeletedSndChunkReplicaDelay db deletedSndChunkReplicaId replicaDelay
691-
atomically $ assertAgentForeground c
691+
liftIO $ assertAgentForeground c
692692
loop
693693
retryDone e = do
694694
atomically $ incXFTPServerStat c userId srv deleteErrs
@@ -703,7 +703,7 @@ delWorkerInternalError c deletedSndChunkReplicaId e = do
703703
withStore' c $ \db -> deleteDeletedSndChunkReplica db deletedSndChunkReplicaId
704704
notify c "" $ SFERR e
705705

706-
assertAgentForeground :: AgentClient -> STM ()
706+
assertAgentForeground :: AgentClient -> IO ()
707707
assertAgentForeground c = do
708708
throwWhenInactive c
709709
waitUntilForeground c

src/Simplex/FileTransfer/Client/Agent.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ defaultXFTPClientAgentConfig =
5353
data XFTPClientAgentError = XFTPClientAgentError XFTPServer XFTPClientError
5454
deriving (Show, Exception)
5555

56-
newXFTPAgent :: XFTPClientAgentConfig -> STM XFTPClientAgent
56+
newXFTPAgent :: XFTPClientAgentConfig -> IO XFTPClientAgent
5757
newXFTPAgent config = do
58-
xftpClients <- TM.empty
58+
xftpClients <- TM.emptyIO
5959
pure XFTPClientAgent {xftpClients, config}
6060

6161
type ME a = ExceptT XFTPClientAgentError IO a

src/Simplex/FileTransfer/Client/Main.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re
313313
pure (encPath, fdRcv, fdSnd, chunkSpecs, encSize)
314314
uploadFile :: TVar ChaChaDRG -> [XFTPChunkSpec] -> TVar [Int64] -> Int64 -> ExceptT CLIError IO [SentFileChunk]
315315
uploadFile g chunks uploadedChunks encSize = do
316-
a <- atomically $ newXFTPAgent defaultXFTPClientAgentConfig
316+
a <- liftIO $ newXFTPAgent defaultXFTPClientAgentConfig
317317
gen <- newTVarIO =<< liftIO newStdGen
318318
let xftpSrvs = fromMaybe defaultXFTPServers (nonEmpty xftpServers)
319319
srvs <- liftIO $ replicateM (length chunks) $ getXFTPServer gen xftpSrvs
@@ -429,7 +429,7 @@ cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath,
429429
receive (ValidFileDescription FileDescription {size, digest, key, nonce, chunks}) = do
430430
encPath <- getEncPath tempPath "xftp"
431431
createDirectory encPath
432-
a <- atomically $ newXFTPAgent defaultXFTPClientAgentConfig
432+
a <- liftIO $ newXFTPAgent defaultXFTPClientAgentConfig
433433
liftIO $ printNoNewLine "Downloading file..."
434434
downloadedChunks <- newTVarIO []
435435
let srv FileChunk {replicas} = case replicas of
@@ -494,7 +494,7 @@ cliDeleteFile DeleteOptions {fileDescription, retryCount, yes} = do
494494
where
495495
deleteFile :: ValidFileDescription 'FSender -> ExceptT CLIError IO ()
496496
deleteFile (ValidFileDescription FileDescription {chunks}) = do
497-
a <- atomically $ newXFTPAgent defaultXFTPClientAgentConfig
497+
a <- liftIO $ newXFTPAgent defaultXFTPClientAgentConfig
498498
forM_ chunks $ deleteFileChunk a
499499
liftIO $ do
500500
printNoNewLine "File deleted!"

src/Simplex/FileTransfer/Server.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira
112112
Right pk' -> pure pk'
113113
Left e -> putStrLn ("servers has no valid key: " <> show e) >> exitFailure
114114
env <- ask
115-
sessions <- atomically TM.empty
115+
sessions <- liftIO TM.emptyIO
116116
let cleanup sessionId = atomically $ TM.delete sessionId sessions
117117
liftIO . runHTTP2Server started xftpPort defaultHTTP2BufferSize serverParams transportConfig inactiveClientExpiration cleanup $ \sessionId sessionALPN r sendResponse -> do
118118
reqBody <- getHTTP2Body r xftpBlockSize
@@ -576,7 +576,7 @@ incFileStat statSel = do
576576
saveServerStats :: M ()
577577
saveServerStats =
578578
asks (serverStatsBackupFile . config)
579-
>>= mapM_ (\f -> asks serverStats >>= atomically . getFileServerStatsData >>= liftIO . saveStats f)
579+
>>= mapM_ (\f -> asks serverStats >>= liftIO . getFileServerStatsData >>= liftIO . saveStats f)
580580
where
581581
saveStats f stats = do
582582
logInfo $ "saving server stats to file " <> T.pack f

src/Simplex/FileTransfer/Server/Env.hs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ module Simplex.FileTransfer.Server.Env where
1111

1212
import Control.Logger.Simple
1313
import Control.Monad
14-
import Control.Monad.IO.Unlift
1514
import Crypto.Random
1615
import Data.Int (Int64)
1716
import Data.List.NonEmpty (NonEmpty)
@@ -105,17 +104,17 @@ supportedXFTPhandshakes = ["xftp/1"]
105104

106105
newXFTPServerEnv :: XFTPServerConfig -> IO XFTPEnv
107106
newXFTPServerEnv config@XFTPServerConfig {storeLogFile, fileSizeQuota, caCertificateFile, certificateFile, privateKeyFile, transportConfig} = do
108-
random <- liftIO C.newRandom
109-
store <- atomically newFileStore
110-
storeLog <- liftIO $ mapM (`readWriteFileStore` store) storeLogFile
107+
random <- C.newRandom
108+
store <- newFileStore
109+
storeLog <- mapM (`readWriteFileStore` store) storeLogFile
111110
used <- countUsedStorage <$> readTVarIO (files store)
112111
atomically $ writeTVar (usedStorage store) used
113112
forM_ fileSizeQuota $ \quota -> do
114113
logInfo $ "Total / available storage: " <> tshow quota <> " / " <> tshow (quota - used)
115114
when (quota < used) $ logInfo "WARNING: storage quota is less than used storage, no files can be uploaded!"
116-
tlsServerParams <- liftIO $ loadTLSServerParams caCertificateFile certificateFile privateKeyFile (alpn transportConfig)
117-
Fingerprint fp <- liftIO $ loadFingerprint caCertificateFile
118-
serverStats <- atomically . newFileServerStats =<< liftIO getCurrentTime
115+
tlsServerParams <- loadTLSServerParams caCertificateFile certificateFile privateKeyFile (alpn transportConfig)
116+
Fingerprint fp <- loadFingerprint caCertificateFile
117+
serverStats <- newFileServerStats =<< getCurrentTime
119118
pure XFTPEnv {config, store, storeLog, random, tlsServerParams, serverIdentity = C.KeyHash fp, serverStats}
120119

121120
countUsedStorage :: M.Map k FileRec -> Int64

src/Simplex/FileTransfer/Server/Stats.hs

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -43,34 +43,34 @@ data FileServerStatsData = FileServerStatsData
4343
}
4444
deriving (Show)
4545

46-
newFileServerStats :: UTCTime -> STM FileServerStats
46+
newFileServerStats :: UTCTime -> IO FileServerStats
4747
newFileServerStats ts = do
48-
fromTime <- newTVar ts
49-
filesCreated <- newTVar 0
50-
fileRecipients <- newTVar 0
51-
filesUploaded <- newTVar 0
52-
filesExpired <- newTVar 0
53-
filesDeleted <- newTVar 0
48+
fromTime <- newTVarIO ts
49+
filesCreated <- newTVarIO 0
50+
fileRecipients <- newTVarIO 0
51+
filesUploaded <- newTVarIO 0
52+
filesExpired <- newTVarIO 0
53+
filesDeleted <- newTVarIO 0
5454
filesDownloaded <- newPeriodStats
55-
fileDownloads <- newTVar 0
56-
fileDownloadAcks <- newTVar 0
57-
filesCount <- newTVar 0
58-
filesSize <- newTVar 0
55+
fileDownloads <- newTVarIO 0
56+
fileDownloadAcks <- newTVarIO 0
57+
filesCount <- newTVarIO 0
58+
filesSize <- newTVarIO 0
5959
pure FileServerStats {fromTime, filesCreated, fileRecipients, filesUploaded, filesExpired, filesDeleted, filesDownloaded, fileDownloads, fileDownloadAcks, filesCount, filesSize}
6060

61-
getFileServerStatsData :: FileServerStats -> STM FileServerStatsData
61+
getFileServerStatsData :: FileServerStats -> IO FileServerStatsData
6262
getFileServerStatsData s = do
63-
_fromTime <- readTVar $ fromTime (s :: FileServerStats)
64-
_filesCreated <- readTVar $ filesCreated s
65-
_fileRecipients <- readTVar $ fileRecipients s
66-
_filesUploaded <- readTVar $ filesUploaded s
67-
_filesExpired <- readTVar $ filesExpired s
68-
_filesDeleted <- readTVar $ filesDeleted s
63+
_fromTime <- readTVarIO $ fromTime (s :: FileServerStats)
64+
_filesCreated <- readTVarIO $ filesCreated s
65+
_fileRecipients <- readTVarIO $ fileRecipients s
66+
_filesUploaded <- readTVarIO $ filesUploaded s
67+
_filesExpired <- readTVarIO $ filesExpired s
68+
_filesDeleted <- readTVarIO $ filesDeleted s
6969
_filesDownloaded <- getPeriodStatsData $ filesDownloaded s
70-
_fileDownloads <- readTVar $ fileDownloads s
71-
_fileDownloadAcks <- readTVar $ fileDownloadAcks s
72-
_filesCount <- readTVar $ filesCount s
73-
_filesSize <- readTVar $ filesSize s
70+
_fileDownloads <- readTVarIO $ fileDownloads s
71+
_fileDownloadAcks <- readTVarIO $ fileDownloadAcks s
72+
_filesCount <- readTVarIO $ filesCount s
73+
_filesSize <- readTVarIO $ filesSize s
7474
pure FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesExpired, _filesDeleted, _filesDownloaded, _fileDownloads, _fileDownloadAcks, _filesCount, _filesSize}
7575

7676
setFileServerStats :: FileServerStats -> FileServerStatsData -> STM ()

src/Simplex/FileTransfer/Server/Store.hs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,11 @@ instance StrEncoding FileRecipient where
5555
strEncode (FileRecipient rId rKey) = strEncode rId <> ":" <> strEncode rKey
5656
strP = FileRecipient <$> strP <* A.char ':' <*> strP
5757

58-
newFileStore :: STM FileStore
58+
newFileStore :: IO FileStore
5959
newFileStore = do
60-
files <- TM.empty
61-
recipients <- TM.empty
62-
usedStorage <- newTVar 0
60+
files <- TM.emptyIO
61+
recipients <- TM.emptyIO
62+
usedStorage <- newTVarIO 0
6363
pure FileStore {files, recipients, usedStorage}
6464

6565
addFile :: FileStore -> SenderId -> FileInfo -> SystemTime -> STM (Either XFTPErrorType ())

0 commit comments

Comments
 (0)