Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions database/ffldb/dbcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,19 +611,25 @@ func (c *dbCache) commitTx(tx *transaction) error {
c.cacheLock.RUnlock()

// Apply every key to add in the database transaction to the cache.
pendingKVs := make([]treap.KVPair, 0, tx.pendingKeys.Len())
tx.pendingKeys.ForEach(func(k, v []byte) bool {
pendingKVs = append(pendingKVs, treap.KVPair{Key: k, Value: v})

newCachedRemove = newCachedRemove.Delete(k)
newCachedKeys = newCachedKeys.Put(k, v)
return true
})
newCachedKeys = newCachedKeys.Put(pendingKVs...)
tx.pendingKeys = nil

// Apply every key to remove in the database transaction to the cache.
pendingRemoveKVs := make([]treap.KVPair, 0, tx.pendingRemove.Len())
tx.pendingRemove.ForEach(func(k, v []byte) bool {
pendingRemoveKVs = append(pendingRemoveKVs, treap.KVPair{Key: k, Value: v})

newCachedKeys = newCachedKeys.Delete(k)
newCachedRemove = newCachedRemove.Put(k, nil)
return true
})
newCachedRemove = newCachedRemove.Put(pendingRemoveKVs...)
tx.pendingRemove = nil

// Atomically replace the immutable treaps which hold the cached keys to
Expand Down
28 changes: 27 additions & 1 deletion database/internal/treap/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package treap

import (
"math/rand"
"sync"
"time"
)

Expand Down Expand Up @@ -33,6 +34,14 @@ var (
emptySlice = make([]byte, 0)
)

// treapNodePool defines a concurrent safe free list of treapNode used to
// provide temporary buffers.
var treapNodePool = sync.Pool{
New: func() any {
return &treapNode{}
},
}

// treapNode represents a node in the treap.
type treapNode struct {
key []byte
Expand All @@ -42,6 +51,16 @@ type treapNode struct {
right *treapNode
}

// recycle resets and puts the treapNode into the treapNodePool to be recycled.
func (n *treapNode) recycle() {
n.key = nil
n.value = nil
n.priority = 0
n.left = nil
n.right = nil
treapNodePool.Put(n)
}

// nodeSize returns the number of bytes the specified node occupies including
// the struct fields and the contents of the key and value.
func nodeSize(node *treapNode) uint64 {
Expand All @@ -51,7 +70,14 @@ func nodeSize(node *treapNode) uint64 {
// newTreapNode returns a new node from the given key, value, and priority. The
// node is not initially linked to any others.
func newTreapNode(key, value []byte, priority int) *treapNode {
return &treapNode{key: key, value: value, priority: priority}
n := treapNodePool.Get().(*treapNode)
n.key = key
n.value = value
n.priority = priority
n.left = nil
n.right = nil

return n
}

// parentStack represents a stack of parent treap nodes that are used during
Expand Down
106 changes: 93 additions & 13 deletions database/internal/treap/immutable.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import (

// cloneTreapNode returns a shallow copy of the passed node.
func cloneTreapNode(node *treapNode) *treapNode {
return &treapNode{
key: node.key,
value: node.value,
priority: node.priority,
left: node.left,
right: node.right,
}
n := treapNodePool.Get().(*treapNode)
n.key = node.key
n.value = node.value
n.priority = node.priority
n.left = node.left
n.right = node.right

return n
}

// Immutable represents a treap data structure which is used to hold ordered
Expand Down Expand Up @@ -104,19 +105,79 @@ func (t *Immutable) Get(key []byte) []byte {
return nil
}

// Put inserts the passed key/value pair.
func (t *Immutable) Put(key, value []byte) *Immutable {
// KVPair is just a helper struct for a key-value pair that's going to be
// inserted into the treap.
type KVPair struct {
Key []byte
Value []byte
}

// Put puts the passed in key/value pairs into the treap. For operations
// requiring many insertions at once, Put is memory efficient as the
// intermediary treap nodes created between each put operation is recycled
// through an internal sync.Pool, reducing overall memory allocation.
func (t *Immutable) Put(kvPairs ...KVPair) *Immutable {
treap := t
var prevTreapNodes [staticDepth]*treapNode

for _, kvPair := range kvPairs {
newTreap, newTreapNodes := treap.put(kvPair.Key, kvPair.Value)

// Loop through the prevTreapNodes and check for treapNodes that
// are no longer being utilized. These will be garbaged collected
// and they're better off being recycled in the treapNodePool.
for _, node := range prevTreapNodes {
if node == nil {
break
}

// Make sure that the node we're going to recycle isn't
// being used by the latest immutable treap by checking
// if the pointer value of the node is the same.
got := newTreap.get(node.key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if got == node {
continue
}

// This node is only being used by the previous immutable
// copy and can safely be put into the treapNodePool to be
// recycled.
node.recycle()
}

// Replace with the latest treap and treap nodes.
treap = newTreap
prevTreapNodes = newTreapNodes
}

return treap
}

// put inserts the passed key/value pair and returns all the newly created
// treapNodes that were created during this put operation. The returned
// treapNodes can then be put into data structures like sync.Pool to reduce the
// memory overhead of allocating new treapNodes during multiple put calls.
func (t *Immutable) put(key, value []byte) (*Immutable, [staticDepth]*treapNode) {
// Use an empty byte slice for the value when none was provided. This
// ultimately allows key existence to be determined from the value since
// an empty byte slice is distinguishable from nil.
if value == nil {
value = emptySlice
}

// recycle is the treapNodes that are created during this put operation.
// We keep track of the nodes as the caller may be choose to recycle
// them to keep memory allocation low.
var (
recycle [staticDepth]*treapNode
currentRecycleIndex int
)

// The node is the root of the tree if there isn't already one.
if t.root == nil {
root := newTreapNode(key, value, rand.Int())
return newImmutable(root, 1, nodeSize(root))
recycle[currentRecycleIndex] = root
return newImmutable(root, 1, nodeSize(root)), recycle
}

// Find the binary tree insertion point and construct a replaced list of
Expand All @@ -132,6 +193,16 @@ func (t *Immutable) Put(key, value []byte) *Immutable {
for node := t.root; node != nil; {
// Clone the node and link its parent to it if needed.
nodeCopy := cloneTreapNode(node)

// Check if we still have space in the recycle for this node.
// It's ok if we don't put every single new node to be recycled
// as there's no guarantee in the sync.Pool that every recycled
// treapNode will be re-utilized.
if currentRecycleIndex < staticDepth {
recycle[currentRecycleIndex] = nodeCopy
currentRecycleIndex++
}

if oldParent := parents.At(0); oldParent != nil {
if oldParent.left == node {
oldParent.left = nodeCopy
Expand Down Expand Up @@ -161,11 +232,20 @@ func (t *Immutable) Put(key, value []byte) *Immutable {
newRoot := parents.At(parents.Len() - 1)
newTotalSize := t.totalSize - uint64(len(node.value)) +
uint64(len(value))
return newImmutable(newRoot, t.count, newTotalSize)
return newImmutable(newRoot, t.count, newTotalSize), recycle
}

// Link the new node into the binary tree in the correct position.
// Check if we still have space in the recycle for this node.
// It's ok if we don't put every single new node to be recycled
// as there's no guarantee in the sync.Pool that every recycled
// treapNode will be re-utilized.
node := newTreapNode(key, value, rand.Int())
if currentRecycleIndex < staticDepth {
recycle[currentRecycleIndex] = node
currentRecycleIndex++
}

// Link the new node into the binary tree in the correct position.
parent := parents.At(0)
if compareResult < 0 {
parent.left = node
Expand Down Expand Up @@ -205,7 +285,7 @@ func (t *Immutable) Put(key, value []byte) *Immutable {
}
}

return newImmutable(newRoot, t.count+1, t.totalSize+nodeSize(node))
return newImmutable(newRoot, t.count+1, t.totalSize+nodeSize(node)), recycle
}

// Delete removes the passed key from the treap and returns the resulting treap
Expand Down
Loading
Loading