Skip to content
This repository was archived by the owner on Sep 28, 2023. It is now read-only.

Commit 782a835

Browse files
Use atomic batching of writes
Under the hood, RocksDB->put uses batches anyway. This makes the use "over the hood". I'm guessing it also means that we only have to acquire locks against the memtable+WAL once, instead of multiple times.
1 parent b5a1bb5 commit 782a835

File tree

3 files changed

+74
-67
lines changed

3 files changed

+74
-67
lines changed

src/java/org/apache/cassandra/rocksdb/RocksDBCF.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,14 @@
5757
import org.apache.cassandra.streaming.StreamSession;
5858
import org.apache.cassandra.utils.FBUtilities;
5959
import org.apache.cassandra.utils.Hex;
60+
6061
import org.rocksdb.BlockBasedTableConfig;
6162
import org.rocksdb.BloomFilter;
6263
import org.rocksdb.CassandraCompactionFilter;
6364
import org.rocksdb.CassandraValueMergeOperator;
6465
import org.rocksdb.ColumnFamilyDescriptor;
6566
import org.rocksdb.ColumnFamilyOptions;
6667
import org.rocksdb.CompactionPriority;
67-
import org.rocksdb.CompressionType;
6868
import org.rocksdb.DBOptions;
6969
import org.rocksdb.Env;
7070
import org.rocksdb.FlushOptions;
@@ -75,6 +75,7 @@
7575
import org.rocksdb.RocksDBException;
7676
import org.rocksdb.Statistics;
7777
import org.rocksdb.StatsLevel;
78+
import org.rocksdb.WriteBatch;
7879
import org.rocksdb.WriteOptions;
7980

8081
import static org.apache.cassandra.rocksdb.RocksDBConfigs.MERGE_OPERANDS_LIMIT;
@@ -98,7 +99,6 @@ public class RocksDBCF implements RocksDBCFMBean
9899
private final CassandraValueMergeOperator mergeOperator;
99100

100101
private final ReadOptions readOptions;
101-
private final WriteOptions disableWAL;
102102
private final FlushOptions flushOptions;
103103

104104
private final int gcGraceSeconds;
@@ -152,7 +152,6 @@ public RocksDBCF(ColumnFamilyStore cfs) throws RocksDBException
152152
// until compaction happens. However in our case, range deletion is only used to remove ranges
153153
// no longer owned by this node. In such case, stale keys would never be quried.
154154
readOptions = new ReadOptions().setIgnoreRangeDeletions(true);
155-
disableWAL = new WriteOptions().setDisableWAL(true);
156155
flushOptions = new FlushOptions().setWaitForFlush(true);
157156

158157
// Register the mbean.
@@ -309,17 +308,14 @@ public RocksDBIteratorAdapter newShardIterator(int shardId, ReadOptions options)
309308
return new RocksDBIteratorAdapter(rocksDB.newIterator(options), rocksMetrics);
310309
}
311310

312-
public void merge(DecoratedKey partitionKey, byte[] key, byte[] value) throws RocksDBException
311+
public void write(DecoratedKey partitionKey, WriteBatch batch, boolean writeCommitLog) throws RocksDBException
313312
{
314-
RocksDB rocksDB = getRocksDBFromKey(partitionKey);
315-
if (RocksDBConfigs.DISABLE_WRITE_TO_COMMITLOG)
316-
{
317-
rocksDB.merge(disableWAL, key, value);
318-
}
319-
else
320-
{
321-
rocksDB.merge(key, value);
313+
WriteOptions options = new WriteOptions();
314+
if(!writeCommitLog) {
315+
options.setDisableWAL(true);
322316
}
317+
318+
getRocksDBFromKey(partitionKey).write(options, batch);
323319
}
324320

325321
public void deleteRange(byte[] start, byte[] end) throws RocksDBException

src/java/org/apache/cassandra/rocksdb/RocksDBEngine.java

Lines changed: 47 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import com.google.common.annotations.VisibleForTesting;
3535
import com.google.common.util.concurrent.ListenableFutureTask;
36+
import org.rocksdb.WriteBatch;
3637
import org.slf4j.Logger;
3738
import org.slf4j.LoggerFactory;
3839

@@ -114,20 +115,64 @@ public void openColumnFamilyStore(ColumnFamilyStore cfs)
114115

115116
public void apply(ColumnFamilyStore cfs, PartitionUpdate update, UpdateTransaction indexer, boolean writeCommitLog)
116117
{
118+
//TODO: WriteBatch takes a hint for how many bytes to reserve that we might want to use.
119+
WriteBatch batch = new WriteBatch();
117120
DecoratedKey partitionKey = update.partitionKey();
118121

119122
for (Row row : update)
120123
{
121-
applyRowToRocksDB(cfs, writeCommitLog, partitionKey, indexer, row);
124+
addWriteToBatch(batch, cfs, partitionKey, row, indexer);
122125
}
123126

124127
Row staticRow = update.staticRow();
125128
if (!staticRow.isEmpty())
126129
{
127-
applyRowToRocksDB(cfs, writeCommitLog, partitionKey, indexer, staticRow);
130+
addWriteToBatch(batch, cfs, partitionKey, staticRow, indexer);
131+
}
132+
133+
RocksDBCF dbcf = rocksDBFamily.get(cfs.metadata.cfId);
134+
try {
135+
dbcf.write(partitionKey, batch, writeCommitLog);
136+
}
137+
catch (RocksDBException e)
138+
{
139+
logger.error(e.toString(), e);
140+
} finally {
141+
indexer.commit();
142+
}
143+
}
144+
145+
private void addWriteToBatch(WriteBatch b,
146+
ColumnFamilyStore cfs,
147+
DecoratedKey partitionKey,
148+
Row row,
149+
UpdateTransaction indexer)
150+
{
151+
152+
Clustering clustering = row.clustering();
153+
154+
byte[] rocksDBKey = RowKeyEncoder.encode(partitionKey, clustering, cfs.metadata);
155+
byte[] rocksDBValue = RowValueEncoder.encode(cfs.metadata, row);
156+
157+
b.merge(rocksDBKey, rocksDBValue);
158+
159+
if (indexer != UpdateTransaction.NO_OP)
160+
{
161+
try
162+
{
163+
secondaryIndexMetrics.rsiTotalInsertions.inc();
164+
indexer.onInserted(row);
165+
}
166+
catch (RuntimeException e)
167+
{
168+
secondaryIndexMetrics.rsiInsertionFailures.inc();
169+
logger.error(e.toString(), e);
170+
throw new StorageEngineException("Index update failed", e);
171+
}
128172
}
129173
}
130174

175+
131176
public UnfilteredRowIterator queryStorage(ColumnFamilyStore cfs, SinglePartitionReadCommand readCommand)
132177
{
133178
Partition partition = new RocksDBPartition(rocksDBFamily.get(cfs.metadata.cfId),
@@ -221,49 +266,6 @@ public AbstractStreamReceiveTask getStreamReceiveTask(StreamSession session, Str
221266
return new RocksDBStreamReceiveTask(session, summary.cfId, summary.files, summary.totalSize);
222267
}
223268

224-
private void applyRowToRocksDB(ColumnFamilyStore cfs,
225-
boolean writeCommitLog,
226-
DecoratedKey partitionKey,
227-
UpdateTransaction indexer,
228-
Row row)
229-
{
230-
231-
Clustering clustering = row.clustering();
232-
233-
byte[] rocksDBKey = RowKeyEncoder.encode(partitionKey, clustering, cfs.metadata);
234-
byte[] rocksDBValue = RowValueEncoder.encode(cfs.metadata, row);
235-
236-
try
237-
{
238-
indexer.start();
239-
rocksDBFamily.get(cfs.metadata.cfId).merge(partitionKey, rocksDBKey, rocksDBValue);
240-
if (indexer != UpdateTransaction.NO_OP)
241-
{
242-
try
243-
{
244-
secondaryIndexMetrics.rsiTotalInsertions.inc();
245-
indexer.onInserted(row);
246-
}
247-
catch (RuntimeException e)
248-
{
249-
secondaryIndexMetrics.rsiInsertionFailures.inc();
250-
logger.error(e.toString(), e);
251-
throw new StorageEngineException("Index update failed", e);
252-
}
253-
254-
}
255-
}
256-
catch (RocksDBException e)
257-
{
258-
logger.error(e.toString(), e);
259-
throw new StorageEngineException("Row merge failed", e);
260-
}
261-
finally
262-
{
263-
indexer.commit();
264-
}
265-
}
266-
267269
public static RocksDBCF getRocksDBCF(UUID cfId)
268270
{
269271
ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(cfId);

test/unit/org/apache/cassandra/rocksdb/RocksDBCFTest.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
import org.apache.cassandra.rocksdb.encoding.value.RowValueEncoder;
3636
import org.apache.cassandra.utils.Hex;
3737
import org.rocksdb.IndexType;
38+
import org.rocksdb.RocksDB;
3839
import org.rocksdb.RocksDBException;
40+
import org.rocksdb.WriteBatch;
3941

4042
import static org.junit.Assert.assertArrayEquals;
4143
import static org.junit.Assert.assertEquals;
@@ -47,6 +49,13 @@ public class RocksDBCFTest extends RocksDBTestBase
4749
{
4850
final DecoratedKey dk = Util.dk("test_key");
4951

52+
public void writeKeyValue(RocksDBCF cf, DecoratedKey partitionKey, byte[] key, byte[] value) throws RocksDBException
53+
{
54+
WriteBatch batch = new WriteBatch();
55+
batch.merge(key, value);
56+
cf.write(partitionKey, batch, true);
57+
}
58+
5059
@Test
5160
public void testMerge() throws RocksDBException
5261
{
@@ -57,7 +66,7 @@ public void testMerge() throws RocksDBException
5766
RocksDBCF rocksDBCF = RocksDBEngine.getRocksDBCF(cfs.metadata.cfId);
5867
byte[] key = "test_key".getBytes();
5968
byte[] value = encodeValue(cfs, "test_value");
60-
rocksDBCF.merge(dk, key, value);
69+
writeKeyValue(rocksDBCF, dk, key, value);
6170
assertArrayEquals(value, rocksDBCF.get(dk, key));
6271
}
6372

@@ -84,10 +93,10 @@ public void testDeleteRange() throws RocksDBException
8493
byte[] d = "d".getBytes();
8594
byte[] value = encodeValue(cfs, "test_value");
8695

87-
rocksDBCF.merge(dk, a, value);
88-
rocksDBCF.merge(dk, b, value);
89-
rocksDBCF.merge(dk, c, value);
90-
rocksDBCF.merge(dk, d, value);
96+
writeKeyValue(rocksDBCF, dk, a, value);
97+
writeKeyValue(rocksDBCF, dk, b, value);
98+
writeKeyValue(rocksDBCF, dk, c, value);
99+
writeKeyValue(rocksDBCF, dk, d, value);
91100

92101
rocksDBCF.deleteRange(b, d);
93102
rocksDBCF.compactRange();
@@ -109,7 +118,7 @@ public void testTruncate() throws RocksDBException
109118
byte[] key = "test_key".getBytes();
110119
byte[] value = encodeValue(cfs, "test_value");
111120

112-
rocksDBCF.merge(dk, key, value);
121+
writeKeyValue(rocksDBCF, dk, key, value);
113122
assertArrayEquals(value, rocksDBCF.get(dk, key));
114123

115124
rocksDBCF.truncate();
@@ -130,7 +139,7 @@ public void testClose() throws RocksDBException
130139
byte[] key = "test_key".getBytes();
131140
byte[] value = encodeValue(cfs, "test_value");
132141

133-
rocksDBCF.merge(dk, key, value);
142+
writeKeyValue(rocksDBCF, dk, key, value);
134143

135144
assertArrayEquals(value, rocksDBCF.get(dk, key));
136145

@@ -157,9 +166,9 @@ public void testDumpPrefix() throws Exception
157166
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
158167
RocksDBCF rocksDBCF = RocksDBEngine.getRocksDBCF(cfs.metadata.cfId);
159168

160-
rocksDBCF.merge(dk, "test_key1".getBytes(), "test_value11".getBytes());
161-
rocksDBCF.merge(dk, "test_key1".getBytes(), "test_value12".getBytes());
162-
rocksDBCF.merge(dk, "test_key2".getBytes(), "test_value2".getBytes());
169+
writeKeyValue(rocksDBCF, dk, "test_key1".getBytes(), "test_value11".getBytes());
170+
writeKeyValue(rocksDBCF, dk, "test_key1".getBytes(), "test_value12".getBytes());
171+
writeKeyValue(rocksDBCF, dk, "test_key2".getBytes(), "test_value2".getBytes());
163172

164173
String dump = rocksDBCF.dumpPrefix(dk, "test_key".getBytes(), Integer.MAX_VALUE);
165174
assertEquals(2, dump.split("\n").length);

0 commit comments

Comments
 (0)