Skip to content

Commit 207c93c

Browse files
tjruwasejerryyangliYang LiGuanhuaWangloadams
authored
DeepNVMe update (#966)
* Fast model checkpointing * Support both legacy and serialized formats * Add io_buffer_mb option * Bug fix * Force flush * More model options; Refactor common codes * --gpu option * --half and more flexible options * Add deepspeed.save_checkpoint() * Free ds memory * Improve repro * Double I/O buffer (#56) * Double I/O buffer (#60) * Add checkpoint comparison (#62) * Add checkpoint comparison * Corrected a typo Co-authored-by: Yang Li <[email protected]> * save_checkpoint perf monitoring * Disable checkpoint save on exit * Perf statistics for save_checkpoint (#64) * save_checkpoint perf monitoring * Disable checkpoint save on exit * add logs for a100-80 * add torch* error log with half flag but without fused flag * log for error * local rank arg * Handle local_rank arg (#78) * save_checkpoint perf monitoring * Disable checkpoint save on exit * local rank arg * Single writer option * Single writer option (#79) * save_checkpoint perf monitoring * Disable checkpoint save on exit * local rank arg * Single writer option * Allow missing folder * DP writer refactor * Update for DS; Add GDS Signed-off-by: Olatunji Ruwase <[email protected]> * Integrate GDS into deepspeed_model_save * Rebase fast persist (#184) * Fast model checkpointing * Support both legacy and serialized formats * Add io_buffer_mb option * Bug fix * Force flush * More model options; Refactor common codes * --gpu option * --half and more flexible options * Add deepspeed.save_checkpoint() * Free ds memory * Improve repro * Double I/O buffer (#56) * Double I/O buffer (#60) * Add checkpoint comparison (#62) * Add checkpoint comparison * Corrected a typo Co-authored-by: Yang Li <[email protected]> * save_checkpoint perf monitoring * Disable checkpoint save on exit * Perf statistics for save_checkpoint (#64) * save_checkpoint perf monitoring * Disable checkpoint save on exit * add logs for a100-80 * add torch* error log with half flag but without fused flag * log for error * local rank arg * Handle local_rank arg (#78) * save_checkpoint perf monitoring * Disable checkpoint save on exit * local rank arg * Single writer option * Single writer option (#79) * save_checkpoint perf monitoring * Disable checkpoint save on exit * local rank arg * Single writer option * Allow missing folder * DP writer refactor * Update for DS; Add GDS Signed-off-by: Olatunji Ruwase <[email protected]> * Integrate GDS into deepspeed_model_save --------- Signed-off-by: Olatunji Ruwase <[email protected]> Co-authored-by: jerryyangli <[email protected]> Co-authored-by: Yang Li <[email protected]> Co-authored-by: GuanhuaWang <[email protected]> * Move folder Signed-off-by: Olatunji Ruwase <[email protected]> * Remove folder Signed-off-by: Olatunji Ruwase <[email protected]> * More cleanup Signed-off-by: Olatunji Ruwase <[email protected]> * torch changes Signed-off-by: Olatunji Ruwase <[email protected]> * sglang+zero_inference * Remove file * Add offload configs * Add pin_memory * Cleanup scripts * SGLang README * Remove file --------- Signed-off-by: Olatunji Ruwase <[email protected]> Co-authored-by: jerryyangli <[email protected]> Co-authored-by: Yang Li <[email protected]> Co-authored-by: GuanhuaWang <[email protected]> Co-authored-by: Logan Adams <[email protected]> Co-authored-by: Hongwei Chen <[email protected]> Co-authored-by: Zhipeng Wang <[email protected]>
1 parent 86aeab2 commit 207c93c

17 files changed

+4620
-2
lines changed

deepnvme/file_access/aio_load_cpu_tensor.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import os, timeit, functools
33
from deepspeed.ops.op_builder import AsyncIOBuilder
44
from utils import parse_read_arguments, GIGA_UNIT
5+
from deepspeed.accelerator import get_accelerator
56

67
def file_read(inp_f, handle, bounce_buffer):
78
handle.sync_pread(bounce_buffer, inp_f)
@@ -14,7 +15,12 @@ def main():
1415
cnt = args.loop
1516

1617
aio_handle = AsyncIOBuilder().load().aio_handle()
17-
bounce_buffer = torch.empty(os.path.getsize(input_file), dtype=torch.uint8).pin_memory()
18+
native_locked_tensor = get_accelerator()._name == 'cpu'
19+
20+
if native_locked_tensor:
21+
bounce_buffer = aio_handle.new_cpu_locked_tensor(file_sz, torch.Tensor().to(torch.uint8))
22+
else:
23+
bounce_buffer = torch.empty(file_sz, dtype=torch.uint8).pin_memory()
1824

1925
t = timeit.Timer(functools.partial(file_read, input_file, aio_handle, bounce_buffer))
2026
aio_t = t.timeit(cnt)
@@ -27,5 +33,8 @@ def main():
2733
py_tensor = py_file_read(input_file)
2834
print(f'Validation success = {aio_tensor.equal(py_tensor)}')
2935

36+
if native_locked_tensor:
37+
aio_handle.free_cpu_locked_tensor(bounce_buffer)
38+
3039
if __name__ == "__main__":
3140
main()

deepnvme/file_access/aio_store_cpu_tensor.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import os, timeit, functools, pathlib
33
from deepspeed.ops.op_builder import AsyncIOBuilder
44
from utils import parse_write_arguments, GIGA_UNIT
5+
from deepspeed.accelerator import get_accelerator
56

67
def file_write(out_f, tensor, handle, bounce_buffer):
78
bounce_buffer.copy_(tensor)
@@ -14,9 +15,13 @@ def main():
1415
pathlib.Path(output_file).unlink(missing_ok=True)
1516
file_sz = args.mb_size*(1024**2)
1617
app_tensor = torch.empty(file_sz, dtype=torch.uint8, device='cpu', requires_grad=False)
18+
native_locked_tensor = get_accelerator()._name == 'cpu'
1719

1820
aio_handle = AsyncIOBuilder().load().aio_handle()
19-
bounce_buffer = torch.empty(file_sz, dtype=torch.uint8, requires_grad=False).pin_memory()
21+
if native_locked_tensor:
22+
bounce_buffer = aio_handle.new_cpu_locked_tensor(file_sz, torch.Tensor().to(torch.uint8))
23+
else:
24+
bounce_buffer = torch.empty(file_sz, dtype=torch.uint8, requires_grad=False).pin_memory()
2025

2126

2227
t = timeit.Timer(functools.partial(file_write, output_file, app_tensor, aio_handle, bounce_buffer))
@@ -33,6 +38,9 @@ def main():
3338
filecmp.clear_cache()
3439
print(f'Validation success = {filecmp.cmp(py_ref_file, output_file, shallow=False) }')
3540

41+
if native_locked_tensor:
42+
aio_handle.free_cpu_locked_tensor(bounce_buffer)
43+
3644
pathlib.Path(output_file).unlink(missing_ok=True)
3745

3846

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
import time
2+
import torch
3+
import os
4+
import shutil
5+
import gc
6+
import random
7+
import numpy as np
8+
import deepspeed
9+
from deepspeed.accelerator import get_accelerator
10+
from save_model_utils import get_model, validate_arguments, parse_arguments
11+
12+
def _get_ds_config(args, writer_type, use_gds):
13+
ds_config = {
14+
"train_micro_batch_size_per_gpu": 1,
15+
"zero_optimization": {
16+
"stage": args.zero_stage,
17+
"cpu_offload": args.cpu_offload
18+
},
19+
"fp16": {
20+
"enabled": args.half
21+
},
22+
"optimizer": {
23+
"type": "Adam",
24+
"params": {
25+
"torch_adam": not args.fused
26+
}
27+
},
28+
"checkpoint": {
29+
"checkpoint_serialization": not args.legacy
30+
},
31+
"aio": {
32+
"block_size": 8 * (1024**2),
33+
"queue_depth": 8,
34+
"single_submit": False,
35+
"overlap_events": True,
36+
"intra_op_parallelism": 2,
37+
"use_gds": use_gds,
38+
}
39+
}
40+
41+
if writer_type:
42+
ds_config["checkpoint"]["writer"] = {
43+
"type": writer_type,
44+
"io_buffer_size": args.io_buffer_mb * (1024**2),
45+
"io_buffer_double": not args.single_io_buffer,
46+
"show_statistics": not args.no_statistics,
47+
"data_parallel": "socket" # None # not args.single_writer
48+
}
49+
50+
return ds_config
51+
52+
53+
def _get_ds_engine(model, ds_config):
54+
ds_engine, _, _, _ = deepspeed.initialize(
55+
model=model, model_parameters=model.parameters(), config=ds_config)
56+
57+
return ds_engine
58+
59+
60+
def _do_optimizer_step(ds_engine):
61+
for p in ds_engine.module.parameters():
62+
p.grad = torch.zeros_like(p)
63+
ds_engine.step()
64+
65+
66+
def _free_ds_memory(ds_engine):
67+
ds_engine.optimizer.optimizer = None
68+
ds_engine.optimizer = None
69+
ds_engine.module = None
70+
ds_engine = None
71+
del ds_engine
72+
gc.collect()
73+
get_accelerator().empty_cache()
74+
75+
76+
def test_save(tag, folder, model, args, writer_type):
77+
use_gds = writer_type == 'fast' and 'gds' in tag
78+
ds_config = _get_ds_config(args, writer_type, use_gds)
79+
ds_engine = _get_ds_engine(model, ds_config)
80+
if args.zero_stage == 0:
81+
_do_optimizer_step(ds_engine)
82+
83+
st = time.time()
84+
ds_engine.save_checkpoint(save_dir=folder, tag=tag)
85+
write_sec = time.time() - st
86+
_free_ds_memory(ds_engine)
87+
return write_sec
88+
89+
90+
def _get_folder_size(folder):
91+
size = 0
92+
for path, _, files in os.walk(folder):
93+
size += sum([os.path.getsize(os.path.join(path, f)) for f in files])
94+
return size
95+
96+
97+
def run(model, model_name, ckpt_name, args):
98+
print(f'Model name = {model_name}')
99+
writer_dict = {
100+
'test_save': None,
101+
'test_ds_mock_save': 'mock',
102+
'test_ds_py_save': 'python',
103+
'test_ds_aio_fast_save': 'fast',
104+
'test_ds_gds_fast_save': 'fast',
105+
}
106+
for tag, writer_type in writer_dict.items():
107+
folder = os.path.join(args.folder, ckpt_name, tag)
108+
if os.path.exists(folder):
109+
shutil.rmtree(folder, ignore_errors=True)
110+
# if not os.path.exists(folder):
111+
# os.makedirs(folder, exist_ok=True)
112+
write_sec = test_save(tag, folder, model, args, writer_type)
113+
ckpt_size = _get_folder_size(folder)
114+
gb_size = ckpt_size / (1024**3)
115+
gb_per_sec = gb_size / write_sec
116+
print(
117+
f'{tag} -- {gb_size:5.2f} GB, {write_sec:5.2f} secs, {gb_per_sec:5.2f} GB/s'
118+
)
119+
print(f'*********************************************')
120+
121+
122+
def main():
123+
print(
124+
f'Performance test of deepspeed integration of fast model checkpointing.'
125+
)
126+
print(f'torch version = {torch.__version__}')
127+
torch.manual_seed(42)
128+
np.random.seed(0)
129+
random.seed(0)
130+
args = parse_arguments()
131+
if not validate_arguments(args):
132+
quit()
133+
134+
model, model_name, ckpt_name = get_model(args.model)
135+
run(model, model_name, ckpt_name, args)
136+
137+
138+
if __name__ == "__main__":
139+
main()
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
transformers
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import argparse
2+
import os
3+
from transformers import AutoModelForCausalLM
4+
from transformers import T5ForConditionalGeneration
5+
from torch_save_utils import PINNED_BUFFER_MB
6+
7+
8+
GPT2L = 'gpt2-large'
9+
TINY_T5 = 'tiny-t5'
10+
PHI3_MINI = 'phi3'
11+
PHI3_VISION = 'phi3-v'
12+
LLAMA3_1B = 'llama3-1B'
13+
14+
HF_MODELS_DICT = {
15+
TINY_T5: "hf-internal-testing/tiny-random-t5",
16+
GPT2L: GPT2L,
17+
PHI3_MINI: "microsoft/Phi-3.5-mini-instruct",
18+
PHI3_VISION: "microsoft/Phi-3.5-vision-instruct",
19+
LLAMA3_1B: "meta-llama/Llama-3.2-1B",
20+
}
21+
22+
def _get_hf_model(tag):
23+
model_name = HF_MODELS_DICT[tag]
24+
if tag == TINY_T5:
25+
model = T5ForConditionalGeneration.from_pretrained(model_name)
26+
else:
27+
model = AutoModelForCausalLM.from_pretrained(model_name)
28+
29+
return model, model_name, tag
30+
31+
def get_model(model_tag):
32+
return _get_hf_model(model_tag)
33+
34+
35+
def validate_arguments(args):
36+
success = True
37+
38+
if not args.model in HF_MODELS_DICT:
39+
print(f'{args.model} is not a supported HF model tag')
40+
success = False
41+
42+
if args.optimizer and args.half:
43+
if not args.gpu:
44+
print(f'mixed precision only supported with gpu tensors')
45+
success = False
46+
47+
return success
48+
49+
50+
def parse_arguments():
51+
parser = argparse.ArgumentParser()
52+
parser.add_argument('--folder',
53+
default=None,
54+
type=str,
55+
required=True,
56+
help='Folder to use for I/O.')
57+
58+
parser.add_argument(
59+
'--model',
60+
default=None,
61+
type=str,
62+
required=True,
63+
help=f'HuggingFace tag of model. Available models = {list(HF_MODELS_DICT.keys())}')
64+
65+
parser.add_argument('--local_rank',
66+
type=int,
67+
default=0,
68+
help='Local rank' )
69+
70+
parser.add_argument('--legacy',
71+
action='store_true',
72+
help='Use torch legacy save format')
73+
74+
parser.add_argument('--optimizer',
75+
action='store_true',
76+
help='Include optimizer state in checkpoint.')
77+
78+
parser.add_argument('--fused',
79+
action='store_true',
80+
help='Use fused fp16 optimizer.')
81+
82+
parser.add_argument('--gpu', action='store_true', help='Use gpu tensors.')
83+
84+
parser.add_argument('--half',
85+
action='store_true',
86+
help='Use half-precision tensors.')
87+
88+
parser.add_argument(
89+
'--io_buffer_mb',
90+
type=int,
91+
default=PINNED_BUFFER_MB,
92+
help=f'Size of pinned i/o buffer in MB. Default = {PINNED_BUFFER_MB}')
93+
94+
parser.add_argument('--zero_stage',
95+
type=int,
96+
default=0,
97+
help='ZeRO optimization stage. Default = 0')
98+
99+
parser.add_argument('--cpu_offload',
100+
action='store_true',
101+
help='Enable CPU offload of optimizer state.')
102+
103+
parser.add_argument('--no-statistics',
104+
action='store_true',
105+
help='Suppress low-level performance statistics.')
106+
107+
parser.add_argument('--single_io_buffer',
108+
action='store_true',
109+
help='Disable double buffering of i/o buffer.')
110+
111+
112+
#parser.add_argument('--single_writer', action='store_true', help='Disable parallel rank writes of data parallel (replicated) state')
113+
114+
args = parser.parse_args()
115+
print(f'args = {args}')
116+
return args

0 commit comments

Comments
 (0)