Skip to content

Commit 27861bf

Browse files
authored
Merge pull request #458 from scylladb/example_allocations
examples: add an example with allocation stats
2 parents b8e7e70 + 7062d59 commit 27861bf

File tree

2 files changed

+181
-0
lines changed

2 files changed

+181
-0
lines changed

examples/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ tracing-subscriber = "0.2.16"
1717
chrono = "0.4"
1818
uuid = "1.0"
1919
tower = "0.4"
20+
stats_alloc = "0.1"
21+
clap = { version = "3.2.4", features = ["derive"] }
2022

2123
[[example]]
2224
name = "auth"
@@ -93,3 +95,7 @@ path = "custom_load_balancing_policy.rs"
9395
[[example]]
9496
name = "tower"
9597
path = "tower.rs"
98+
99+
[[example]]
100+
name = "allocations"
101+
path = "allocations.rs"

examples/allocations.rs

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
use anyhow::Result;
2+
use scylla::{statement::prepared_statement::PreparedStatement, Session, SessionBuilder};
3+
use std::io::Write;
4+
use std::sync::Arc;
5+
6+
use tokio::sync::Semaphore;
7+
8+
use stats_alloc::{Stats, StatsAlloc, INSTRUMENTED_SYSTEM};
9+
use std::alloc::System;
10+
11+
use clap::{ArgEnum, Parser};
12+
13+
#[derive(Parser, Debug)]
14+
#[clap(author, version, about, long_about = None)]
15+
struct Args {
16+
#[clap(arg_enum, value_parser, default_value = "all")]
17+
mode: Mode,
18+
19+
#[clap(short, long, value_parser, default_value = "127.0.0.1:9042")]
20+
node: String,
21+
22+
#[clap(short, long, value_parser, default_value_t = 256usize)]
23+
parallelism: usize,
24+
25+
#[clap(short, long, value_parser, default_value_t = 100_000usize)]
26+
requests: usize,
27+
}
28+
29+
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, ArgEnum)]
30+
enum Mode {
31+
All,
32+
Insert,
33+
Select,
34+
}
35+
36+
#[global_allocator]
37+
static GLOBAL: &StatsAlloc<System> = &INSTRUMENTED_SYSTEM;
38+
39+
fn print_stats(stats: &stats_alloc::Stats, reqs: f64) {
40+
println!(
41+
"allocs/req: {:9.2}",
42+
stats.allocations as f64 / reqs
43+
);
44+
println!(
45+
"reallocs/req: {:9.2}",
46+
stats.reallocations as f64 / reqs
47+
);
48+
println!(
49+
"frees/req: {:9.2}",
50+
stats.deallocations as f64 / reqs
51+
);
52+
println!(
53+
"bytes allocated/req: {:9.2}",
54+
stats.bytes_allocated as f64 / reqs
55+
);
56+
println!(
57+
"bytes reallocated/req: {:9.2}",
58+
stats.bytes_reallocated as f64 / reqs
59+
);
60+
println!(
61+
"bytes freed/req: {:9.2}",
62+
stats.bytes_deallocated as f64 / reqs
63+
);
64+
}
65+
66+
async fn measure(
67+
session: Arc<Session>,
68+
prepared: Arc<PreparedStatement>,
69+
sem: Arc<Semaphore>,
70+
reqs: usize,
71+
parallelism: usize,
72+
) -> Stats {
73+
let initial_stats = GLOBAL.stats();
74+
75+
for i in 0..reqs {
76+
if i % 10000 == 0 {
77+
print!(".");
78+
std::io::stdout().flush().unwrap();
79+
}
80+
let session = session.clone();
81+
let prepared = prepared.clone();
82+
let permit = sem.clone().acquire_owned().await;
83+
tokio::task::spawn(async move {
84+
let i = i;
85+
session
86+
.execute(&prepared, (i as i32, 2 * i as i32))
87+
.await
88+
.unwrap();
89+
90+
let _permit = permit;
91+
});
92+
}
93+
println!();
94+
95+
// Wait for all in-flight requests to finish
96+
for _ in 0..parallelism {
97+
sem.acquire().await.unwrap().forget();
98+
}
99+
100+
GLOBAL.stats() - initial_stats
101+
}
102+
103+
#[tokio::main]
104+
async fn main() -> Result<()> {
105+
let args = Args::parse();
106+
println!("{:?}", args);
107+
108+
println!("Connecting to {} ...", args.node);
109+
110+
let session: Session = SessionBuilder::new().known_node(args.node).build().await?;
111+
let session = Arc::new(session);
112+
113+
session.query("CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}", &[]).await?;
114+
session.await_schema_agreement().await.unwrap();
115+
116+
session
117+
.query(
118+
"CREATE TABLE IF NOT EXISTS ks.alloc_test (a int, b int, c text, primary key (a, b))",
119+
&[],
120+
)
121+
.await?;
122+
123+
session.await_schema_agreement().await.unwrap();
124+
125+
let prepared_inserts = Arc::new(
126+
session
127+
.prepare("INSERT INTO ks.alloc_test (a, b, c) VALUES (?, ?, 'abc')")
128+
.await?,
129+
);
130+
131+
let prepared_selects = Arc::new(
132+
session
133+
.prepare("SELECT * FROM ks.alloc_test WHERE a = ? and b = ?")
134+
.await?,
135+
);
136+
137+
let sem = Arc::new(Semaphore::new(args.parallelism));
138+
139+
if args.mode == Mode::All || args.mode == Mode::Insert {
140+
print!("Sending {} inserts, hold tight ", args.requests);
141+
let write_stats = measure(
142+
session.clone(),
143+
prepared_inserts.clone(),
144+
sem.clone(),
145+
args.requests,
146+
args.parallelism,
147+
)
148+
.await;
149+
println!("----------");
150+
println!("Inserts:");
151+
println!("----------");
152+
print_stats(&write_stats, args.requests as f64);
153+
println!("----------");
154+
sem.add_permits(args.parallelism);
155+
}
156+
157+
if args.mode == Mode::All || args.mode == Mode::Select {
158+
print!("Sending {} selects, hold tight ", args.requests);
159+
let read_stats = measure(
160+
session.clone(),
161+
prepared_selects.clone(),
162+
sem.clone(),
163+
args.requests,
164+
args.parallelism,
165+
)
166+
.await;
167+
println!("----------");
168+
println!("Selects:");
169+
println!("----------");
170+
print_stats(&read_stats, args.requests as f64);
171+
println!("----------");
172+
}
173+
174+
Ok(())
175+
}

0 commit comments

Comments
 (0)