Skip to content

Commit 908ea0b

Browse files
Enabled Zipfian key distribution for arbitrary commands. (#314)
* Fixed Zipfian support for arbitrary commands. * Added zipfian tests for arbitrary command * Prunned unecessary/unrelated changes. * Prunned unecessary/unrelated changes. * added sequential and zifpian test cases. --------- Co-authored-by: fcostaoliveira <[email protected]>
1 parent 68fb953 commit 908ea0b

File tree

4 files changed

+457
-6
lines changed

4 files changed

+457
-6
lines changed

config_types.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ bool arbitrary_command::set_key_pattern(const char* pattern_str) {
326326

327327
if (pattern_str[0] != 'R' &&
328328
pattern_str[0] != 'G' &&
329+
pattern_str[0] != 'Z' &&
329330
pattern_str[0] != 'S' &&
330331
pattern_str[0] != 'P') {
331332

memtier_benchmark.cpp

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -997,7 +997,7 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
997997
// command configuration always applied on last configured command
998998
arbitrary_command& cmd = cfg->arbitrary_commands->get_last_command();
999999
if (!cmd.set_key_pattern(optarg)) {
1000-
fprintf(stderr, "error: key-pattern for command %s must be in the format of [S/R/G/P].\n", cmd.command_name.c_str());
1000+
fprintf(stderr, "error: key-pattern for command %s must be in the format of [S/R/Z/G/P].\n", cmd.command_name.c_str());
10011001
return -1;
10021002
}
10031003
break;
@@ -1160,6 +1160,7 @@ void usage() {
11601160
" --command-key-pattern Key pattern for the command (default: R):\n"
11611161
" G for Gaussian distribution.\n"
11621162
" R for uniform Random.\n"
1163+
" Z for zipf distribution (will limit keys to positive).\n"
11631164
" S for Sequential.\n"
11641165
" P for Parallel (Sequential were each client has a subset of the key-range).\n"
11651166
"\n"
@@ -1790,7 +1791,21 @@ int main(int argc, char *argv[])
17901791
obj_gen->set_key_distribution(cfg.key_stddev, cfg.key_median);
17911792
}
17921793
obj_gen->set_expiry_range(cfg.expiry_range.min, cfg.expiry_range.max);
1793-
if (cfg.key_pattern[key_pattern_set] == 'Z' || cfg.key_pattern[key_pattern_get] == 'Z') {
1794+
1795+
// Check if Zipfian distribution is needed for global key patterns or arbitrary commands
1796+
bool needs_zipfian = (cfg.key_pattern[key_pattern_set] == 'Z' || cfg.key_pattern[key_pattern_get] == 'Z');
1797+
1798+
// Also check if any arbitrary command uses Zipfian distribution
1799+
if (!needs_zipfian && cfg.arbitrary_commands->is_defined()) {
1800+
for (size_t i = 0; i < cfg.arbitrary_commands->size(); i++) {
1801+
if (cfg.arbitrary_commands->at(i).key_pattern == 'Z') {
1802+
needs_zipfian = true;
1803+
break;
1804+
}
1805+
}
1806+
}
1807+
1808+
if (needs_zipfian) {
17941809
if (cfg.key_zipf_exp == 0.0) {
17951810
// user can't specify 0.0, so 0.0 means unset
17961811
cfg.key_zipf_exp = 1.0;

tests/tests_oss_zipfian_distribution.py

Lines changed: 320 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,20 @@
11
from collections import Counter
22
import math
3+
import tempfile
34
from itertools import pairwise
45

5-
from zipfian_benchmark_runner import ZipfianBenchmarkRunner
6+
from zipfian_benchmark_runner import ZipfianBenchmarkRunner, MonitorThread
7+
from include import (
8+
addTLSArgs,
9+
get_default_memtier_config,
10+
get_expected_request_count,
11+
add_required_env_arguments,
12+
ensure_clean_benchmark_folder,
13+
agg_info_commandstats,
14+
assert_minimum_memtier_outcomes
15+
)
16+
from mbdirector.benchmark import Benchmark
17+
from mbdirector.runner import RunConfig
618

719

820
def correlation_coeficient(x: list[float], y: list[float]) -> float:
@@ -94,3 +106,310 @@ def test_zipfian_exponent_effect(env):
94106

95107
# Higher exponent should have higher concentration in top keys
96108
env.assertTrue(concentration2 > concentration1)
109+
110+
111+
def test_zipfian_arbitrary_command_set(env):
112+
"""Test that arbitrary SET command with zipfian key pattern follows Zipf's law"""
113+
key_min = 1
114+
key_max = 10000
115+
116+
# Use the helper class to run benchmark with arbitrary commands
117+
runner = ZipfianBenchmarkRunner(env, key_min, key_max)
118+
commands = ["SET __key__ __data__"]
119+
combined_key_counts = runner.run_arbitrary_command_benchmark_and_collect_key_counting(
120+
env.testName, commands
121+
)
122+
123+
# Verify Zipfian properties using helper function
124+
correlation = analyze_zipfian_correlation(combined_key_counts)
125+
126+
# should be close to -1 for a perfect law relationship
127+
# should be negative since log(rank) is increasing and log(freq) is decreasing
128+
env.assertTrue(correlation < -0.8)
129+
130+
131+
def test_zipfian_arbitrary_command_hset(env):
132+
"""Test that arbitrary HSET command with zipfian key pattern follows Zipf's law"""
133+
key_min = 1
134+
key_max = 10000
135+
136+
# Use the helper class to run benchmark with arbitrary commands
137+
runner = ZipfianBenchmarkRunner(env, key_min, key_max)
138+
commands = ["HSET __key__ field1 __data__"]
139+
combined_key_counts = runner.run_arbitrary_command_benchmark_and_collect_key_counting(
140+
env.testName, commands
141+
)
142+
143+
# Verify Zipfian properties using helper function
144+
correlation = analyze_zipfian_correlation(combined_key_counts)
145+
146+
# should be close to -1 for a perfect law relationship
147+
env.assertTrue(correlation < -0.8)
148+
149+
150+
def test_zipfian_arbitrary_command_mixed_operations(env):
151+
"""Test multiple arbitrary commands with zipfian key pattern"""
152+
key_min = 1
153+
key_max = 10000
154+
155+
# Use the helper class to run benchmark with multiple arbitrary commands
156+
runner = ZipfianBenchmarkRunner(env, key_min, key_max)
157+
commands = [
158+
"SET __key__ __data__",
159+
"GET __key__",
160+
"HSET __key__ field1 __data__"
161+
]
162+
combined_key_counts = runner.run_arbitrary_command_benchmark_and_collect_key_counting(
163+
env.testName, commands
164+
)
165+
166+
# Verify Zipfian properties using helper function
167+
correlation = analyze_zipfian_correlation(combined_key_counts)
168+
169+
# should be close to -1 for a perfect law relationship
170+
env.assertTrue(correlation < -0.8)
171+
172+
# Verify that we have a reasonable distribution
173+
env.assertTrue(len(combined_key_counts) > 100) # Should access many different keys
174+
env.assertTrue(sum(combined_key_counts.values()) > 1000) # Should have many operations
175+
176+
177+
def test_zipfian_arbitrary_command_exponent_effect(env):
178+
"""Test different Zipfian exponents with arbitrary commands"""
179+
key_min = 1
180+
key_max = 10000
181+
182+
# Test with different exponents
183+
zipf_exponents = [0.5, 1.0, 2.0]
184+
distributions = {}
185+
186+
# Run benchmarks for each exponent
187+
runner = ZipfianBenchmarkRunner(env, key_min, key_max)
188+
commands = ["SET __key__ __data__"]
189+
190+
for zipf_exp in zipf_exponents:
191+
test_name = f"{env.testName}_exp_{zipf_exp}"
192+
combined_key_counts = runner.run_arbitrary_command_benchmark_and_collect_key_counting(
193+
test_name, commands, zipf_exp=zipf_exp
194+
)
195+
distributions[zipf_exp] = combined_key_counts
196+
197+
# Verify that higher exponents lead to more skewed distributions
198+
sorted_exponents = sorted(zipf_exponents)
199+
for i in range(len(sorted_exponents) - 1):
200+
exp1 = sorted_exponents[i]
201+
exp2 = sorted_exponents[i + 1]
202+
203+
dist1 = distributions[exp1]
204+
dist2 = distributions[exp2]
205+
206+
concentration1 = calculate_concentration_ratio(dist1)
207+
concentration2 = calculate_concentration_ratio(dist2)
208+
209+
# Higher exponent should have higher concentration in top keys
210+
env.assertTrue(concentration2 > concentration1)
211+
212+
213+
def test_sequential_zipfian_mixed_pattern(env):
214+
"""Test mixed sequential and zipfian key patterns (S:Z and Z:S)"""
215+
key_min = 1
216+
key_max = 1000 # Smaller range for clearer pattern analysis
217+
218+
# Test S:Z pattern (Sequential SET, Zipfian GET)
219+
runner = ZipfianBenchmarkRunner(env, key_min, key_max)
220+
221+
# Create benchmark specs with mixed pattern
222+
args = [
223+
"--ratio=1:1", # Equal SET and GET operations
224+
"--key-pattern=S:Z", # Sequential SET, Zipfian GET
225+
f"--key-minimum={key_min}",
226+
f"--key-maximum={key_max}",
227+
"--key-zipf-exp=1.0"
228+
]
229+
230+
benchmark_specs = {"name": f"{env.testName}_S_Z", "args": args}
231+
232+
addTLSArgs(benchmark_specs, env)
233+
config = get_default_memtier_config(threads=runner.threads, clients=runner.clients)
234+
master_nodes_list = env.getMasterNodesList()
235+
overall_expected_request_count = get_expected_request_count(config, key_min, key_max)
236+
add_required_env_arguments(benchmark_specs, config, env, master_nodes_list)
237+
238+
# Create temporary directory and run config
239+
test_dir = tempfile.mkdtemp()
240+
run_config = RunConfig(test_dir, f"{env.testName}_S_Z", config, {})
241+
ensure_clean_benchmark_folder(run_config.results_dir)
242+
243+
benchmark = Benchmark.from_json(run_config, benchmark_specs)
244+
245+
# Setup monitoring
246+
master_nodes_connections = env.getOSSMasterNodesConnectionList()
247+
248+
monitor_threads = []
249+
for conn in master_nodes_connections:
250+
# prevent accumulating stats from previous runs
251+
conn.execute_command("CONFIG", "RESETSTAT")
252+
253+
# start monitoring connection
254+
monitor_thread = MonitorThread(conn)
255+
monitor_thread.start()
256+
monitor_threads.append(monitor_thread)
257+
258+
# Run the benchmark
259+
memtier_ok = benchmark.run()
260+
261+
# Verify the benchmark ran successfully
262+
merged_command_stats = {
263+
"cmdstat_set": {"calls": 0},
264+
"cmdstat_get": {"calls": 0},
265+
}
266+
overall_request_count = agg_info_commandstats(
267+
master_nodes_connections, merged_command_stats
268+
)
269+
assert_minimum_memtier_outcomes(
270+
run_config,
271+
env,
272+
memtier_ok,
273+
overall_expected_request_count,
274+
overall_request_count,
275+
)
276+
277+
# Collect and combine results from all monitor threads
278+
combined_key_counts = Counter()
279+
for thread in monitor_threads:
280+
thread.join() # waits for monitor thread to finish
281+
combined_key_counts.update(thread.key_counts)
282+
283+
# For S:Z pattern, we expect:
284+
# - Some keys accessed more frequently (from zipfian GET operations)
285+
# - But also some sequential pattern influence (from SET operations)
286+
# The distribution should be less perfectly zipfian than pure Z:Z
287+
288+
# Verify we have reasonable key distribution
289+
env.assertTrue(len(combined_key_counts) > 50) # Should access many different keys
290+
env.assertTrue(sum(combined_key_counts.values()) > 100) # Should have many operations
291+
292+
# The correlation should be negative but may be weaker than pure zipfian
293+
correlation = analyze_zipfian_correlation(combined_key_counts)
294+
env.assertTrue(correlation < -0.3) # Weaker requirement due to mixed pattern
295+
296+
def test_zipfian_and_sequential_simultaneous_arbitrary_commands(env):
297+
"""Test simultaneous zipfian HSET and sequential HGETALL commands in high keyspace"""
298+
key_min = 950000 # Start near end of typical keyspace
299+
key_max = 1000000 # End at 1M
300+
301+
runner = ZipfianBenchmarkRunner(env, key_min, key_max)
302+
303+
# Run both commands simultaneously:
304+
# - HSET with zipfian key pattern (will create/update hash keys following zipfian distribution)
305+
# - HGETALL with sequential key pattern (will read hash keys sequentially)
306+
args = [
307+
f"--key-minimum={key_min}",
308+
f"--key-maximum={key_max}",
309+
"--command=HSET __key__ field1 __data__",
310+
"--command-key-pattern=Z", # Zipfian pattern for HSET
311+
"--command=HGETALL __key__",
312+
"--command-key-pattern=S", # Sequential pattern for HGETALL
313+
"--key-zipf-exp=1.0"
314+
]
315+
316+
benchmark_specs = {"name": env.testName, "args": args}
317+
addTLSArgs(benchmark_specs, env)
318+
config = get_default_memtier_config(threads=runner.threads, clients=runner.clients)
319+
master_nodes_list = env.getMasterNodesList()
320+
overall_expected_request_count = get_expected_request_count(config, key_min, key_max)
321+
add_required_env_arguments(benchmark_specs, config, env, master_nodes_list)
322+
323+
test_dir = tempfile.mkdtemp()
324+
run_config = RunConfig(test_dir, env.testName, config, {})
325+
ensure_clean_benchmark_folder(run_config.results_dir)
326+
327+
benchmark = Benchmark.from_json(run_config, benchmark_specs)
328+
329+
# Setup monitoring
330+
master_nodes_connections = env.getOSSMasterNodesConnectionList()
331+
332+
monitor_threads = []
333+
for conn in master_nodes_connections:
334+
conn.execute_command("CONFIG", "RESETSTAT")
335+
monitor_thread = MonitorThread(conn)
336+
monitor_thread.start()
337+
monitor_threads.append(monitor_thread)
338+
339+
# Run the benchmark with both commands
340+
memtier_ok = benchmark.run()
341+
342+
# Verify the benchmark ran successfully
343+
merged_command_stats = {
344+
"cmdstat_hset": {"calls": 0},
345+
"cmdstat_hgetall": {"calls": 0},
346+
}
347+
overall_request_count = agg_info_commandstats(master_nodes_connections, merged_command_stats)
348+
assert_minimum_memtier_outcomes(run_config, env, memtier_ok, overall_expected_request_count, overall_request_count)
349+
350+
# Collect and combine results from all monitor threads
351+
combined_key_counts = Counter()
352+
for thread in monitor_threads:
353+
thread.join()
354+
combined_key_counts.update(thread.key_counts)
355+
356+
# Verify we have reasonable key distribution
357+
env.assertTrue(len(combined_key_counts) > 100) # Should access many different keys
358+
env.assertTrue(sum(combined_key_counts.values()) > 100) # Should have many operations
359+
360+
# Extract key numbers and analyze patterns
361+
high_range_hits = 0
362+
total_hits = 0
363+
key_numbers = []
364+
365+
for key_name, count in combined_key_counts.items():
366+
total_hits += count
367+
if key_name.startswith("memtier-"):
368+
try:
369+
key_num = int(key_name.split("-")[1])
370+
key_numbers.append(key_num)
371+
if key_num >= 950000:
372+
high_range_hits += count
373+
except (IndexError, ValueError):
374+
pass
375+
376+
# Verify that we got significant hits in the high keyspace range
377+
high_range_percentage = high_range_hits / total_hits if total_hits > 0 else 0
378+
env.assertTrue(high_range_percentage > 0.6) # Should hit high range with mixed patterns
379+
380+
# Verify that keys are in the expected range
381+
if key_numbers:
382+
min_key_accessed = min(key_numbers)
383+
max_key_accessed = max(key_numbers)
384+
env.assertTrue(min_key_accessed >= key_min)
385+
env.assertTrue(max_key_accessed <= key_max)
386+
387+
# Check range coverage - should have good spread due to sequential HGETALL
388+
key_range_covered = max_key_accessed - min_key_accessed
389+
expected_range = key_max - key_min
390+
coverage_ratio = key_range_covered / expected_range
391+
env.assertTrue(coverage_ratio > 0.1) # Should cover at least 10% of range
392+
393+
# The combined pattern should show mixed characteristics:
394+
# - Some zipfian influence from HSET operations
395+
# - Some sequential influence from HGETALL operations
396+
# - Overall less extreme than pure zipfian but not completely uniform
397+
correlation = analyze_zipfian_correlation(combined_key_counts)
398+
env.assertTrue(-0.6 < correlation < -0.2) # Mixed pattern: moderate negative correlation
399+
400+
# Verify both command types were executed
401+
hset_calls = merged_command_stats.get("cmdstat_hset", {}).get("calls", 0)
402+
hgetall_calls = merged_command_stats.get("cmdstat_hgetall", {}).get("calls", 0)
403+
404+
env.assertTrue(hset_calls > 0) # Should have HSET operations
405+
env.assertTrue(hgetall_calls > 0) # Should have HGETALL operations
406+
407+
# Check that we have a reasonable mix of both operations
408+
total_commands = hset_calls + hgetall_calls
409+
if total_commands > 0:
410+
hset_ratio = hset_calls / total_commands
411+
hgetall_ratio = hgetall_calls / total_commands
412+
413+
# Both commands should contribute significantly (at least 20% each)
414+
env.assertTrue(hset_ratio > 0.2)
415+
env.assertTrue(hgetall_ratio > 0.2)

0 commit comments

Comments
 (0)