Skip to content

Commit 69984f1

Browse files
WIP: Create snapshots from lockfiles
1 parent 98f5210 commit 69984f1

File tree

3 files changed

+489
-243
lines changed

3 files changed

+489
-243
lines changed

images/protected-publish/pkg/common.py

Lines changed: 241 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,31 @@
5151
MAX_CONCURRENCY = 10
5252
USE_THREADS = True
5353

54+
SNAPSHOT_TAG_REGEXES = [
55+
re.compile(r"^develop-[\d]{4}-[\d]{2}-[\d]{2}$"),
56+
re.compile(r"^v([\d])+\.([\d])+\.[\d]+$"),
57+
]
58+
59+
PROTECTED_BRANCH_REGEXES = [
60+
re.compile(r"^develop$"),
61+
re.compile(r"^releases/v[\d]+\.[\d]+$"),
62+
]
63+
64+
65+
def tag_source_branch(tag):
66+
"""Parse a tag and return the source branch
67+
"""
68+
m = SNAPSHOT_TAG_REGEXES[0].match(tag):
69+
if m:
70+
return "develop"
71+
72+
m = SNAPSHOT_TAG_REGEXES[1].match(tag)
73+
if m:
74+
major, minor = m.groups()
75+
return "releases/v{major}.{minor}"
76+
77+
return None
78+
5479

5580
################################################################################
5681
# Encapsulate information about a built spec in a mirror
@@ -85,12 +110,13 @@ def bucket_name_from_s3_url(url):
85110

86111
################################################################################
87112
#
88-
def spec_catalogs_from_listing_v2(listing_path: str) -> Dict[str, Dict[str, BuiltSpec]]:
113+
def spec_catalogs_from_listing_v2(listing_path: Optional[str] = None) -> Dict[str, Dict[str, BuiltSpec]]:
89114
"""Return a complete catalog of all the built specs in the listing
90115
91116
Return a complete catalog of all the built specs for every prefix in the
92117
listing. The returned dictionary of catalogs is keyed by unique prefix.
93118
"""
119+
listing_path = list_prefix_contents(list_url, listing_path)
94120
all_catalogs: Dict[str, Dict[str, BuiltSpec]] = defaultdict(
95121
lambda: defaultdict(BuiltSpec)
96122
)
@@ -128,7 +154,8 @@ def spec_catalogs_from_listing_v2(listing_path: str) -> Dict[str, Dict[str, Buil
128154

129155
################################################################################
130156
#
131-
def spec_catalogs_from_listing_v3(listing_path: str) -> Dict[str, Dict[str, BuiltSpec]]:
157+
def spec_catalogs_from_listing_v3(listing_path: Optional[str] = None) -> Dict[str, Dict[str, BuiltSpec]]:
158+
listing_path = list_prefix_contents(list_url, listing_path)
132159
all_catalogs: Dict[str, Dict[str, BuiltSpec]] = defaultdict(
133160
lambda: defaultdict(BuiltSpec)
134161
)
@@ -149,6 +176,189 @@ def spec_catalogs_from_listing_v3(listing_path: str) -> Dict[str, Dict[str, Buil
149176
return all_catalogs
150177

151178

179+
################################################################################
180+
#
181+
def generate_spec_catalogs_v2(
182+
ref: str, exclude: List[str] = [], listing_path: Optional[str] = None
183+
) -> tuple[Dict[str, Dict[str, BuiltSpec]], Dict[str, BuiltSpec]]:
184+
"""Return information about specs in stacks and at the root
185+
186+
Read the listing file, populate and return a tuple of dicts indicating which
187+
specs exist in stacks, and which exist in the top-level buildcache. Stacks
188+
appearing in the ``exclude`` list are ignoreed.
189+
190+
Returns a tuple like the following:
191+
192+
(
193+
# First element of tuple is the stack specs
194+
{
195+
<hash>: {
196+
<stack>: <BuiltSpec>,
197+
...
198+
},
199+
...
200+
},
201+
# Followed by specs at the top level
202+
{
203+
<hash>: <BuiltSpec>,
204+
...
205+
}
206+
)
207+
"""
208+
listing_path = list_prefix_contents(list_url, listing_path)
209+
210+
stack_prefix_regex = re.compile(rf"{ref}/(.+)")
211+
stack_specs: Dict[str, Dict[str, BuiltSpec]] = defaultdict(
212+
lambda: defaultdict(BuiltSpec)
213+
)
214+
all_catalogs = spec_catalogs_from_listing_v2(listing_path)
215+
top_level_specs = all_catalogs[ref]
216+
217+
for prefix in all_catalogs:
218+
m = stack_prefix_regex.search(prefix)
219+
if not m:
220+
continue
221+
222+
stack = m.group(1)
223+
if stack in exclude:
224+
continue
225+
226+
for spec_hash, built_spec in all_catalogs[prefix].items():
227+
stack_specs[stack][spec_hash] = built_spec
228+
229+
return stack_specs, top_level_specs
230+
231+
232+
def format_blob_url(prefix: str, blob_record: Dict[str, str]) -> str:
233+
"""Use prefix and algorithm/checksum from record to build full prefix"""
234+
hash_algo = blob_record.get("checksumAlgorithm", None)
235+
checksum = blob_record.get("checksum", None)
236+
237+
if not hash_algo:
238+
raise MalformedManifestError("Missing 'checksumAlgorithm'")
239+
240+
if not checksum:
241+
raise MalformedManifestError("Missing 'checksum'")
242+
243+
return f"{prefix}/blobs/{hash_algo}/{checksum[:2]}/{checksum}"
244+
245+
246+
def find_data_with_media_type(
247+
data: List[Dict[str, str]], mediaType: str
248+
) -> Dict[str, str]:
249+
"""Return data element with matching mediaType, or else raise"""
250+
for elt in data:
251+
if elt["mediaType"] == mediaType:
252+
return elt
253+
raise NoSuchMediaTypeError(mediaType)
254+
255+
256+
################################################################################
257+
#
258+
def generate_spec_catalogs_v3(
259+
bucket: str,
260+
ref: str,
261+
exclude: List[str] = [],
262+
listing_path: Optional[str] = None,
263+
parallel: int = 8,
264+
) -> tuple[Dict[str, Dict[str, BuiltSpec]], Dict[str, BuiltSpec]]:
265+
"""Return information about specs in stacks and at the root"""
266+
listing_path = list_prefix_contents(list_url, listing_path)
267+
268+
stack_prefix_regex = re.compile(rf"{ref}/(.+)")
269+
stack_specs: Dict[str, Dict[str, BuiltSpec]] = defaultdict(
270+
lambda: defaultdict(BuiltSpec)
271+
)
272+
all_catalogs = spec_catalogs_from_listing_v3(listing_path)
273+
top_level_specs = all_catalogs[ref]
274+
275+
task_list = []
276+
tmpdir = tempfile.mkdtemp()
277+
278+
for prefix in all_catalogs:
279+
m = stack_prefix_regex.search(prefix)
280+
if not m:
281+
continue
282+
283+
stack = m.group(1)
284+
if stack in exclude:
285+
continue
286+
287+
stack_manifests_dir = os.path.join(tmpdir, stack)
288+
os.makedirs(stack_manifests_dir)
289+
stack_manifest_sync_cmd = [
290+
"aws",
291+
"s3",
292+
"sync",
293+
"--exclude",
294+
"*",
295+
"--include",
296+
"*.spec.manifest.json",
297+
f"s3://{bucket}/{prefix}/v3/manifests/spec",
298+
stack_manifests_dir,
299+
]
300+
301+
start_time = datetime.now()
302+
303+
try:
304+
print(f"Downloading manifests for stack {stack}")
305+
subprocess.run(stack_manifest_sync_cmd, check=True)
306+
except subprocess.CalledProcessError as cpe:
307+
error_msg = getattr(cpe, "message", cpe)
308+
print(f"Failed to download manifests for {stack} due to: {error_msg}")
309+
continue
310+
311+
end_time = datetime.now()
312+
elapsed = end_time - start_time
313+
print(f"Downloaded manifests for stack {stack}, elapsed time: {elapsed}")
314+
315+
for spec_hash, built_spec in all_catalogs[prefix].items():
316+
stack_specs[stack][spec_hash] = built_spec
317+
task_list.append((built_spec.hash, stack))
318+
319+
def _process_manifest_fn(spec_hash, stack):
320+
download_dir = os.path.join(tmpdir, stack)
321+
find_cmd = ["find", download_dir, "-type", "f", "-name", f"*{spec_hash}*"]
322+
find_result = subprocess.run(find_cmd, capture_output=True)
323+
324+
if find_result.returncode != 0:
325+
print(f"[{find_cmd}] failed to find manifest for {spec_hash} in {stack}")
326+
return (None, None, None, None)
327+
328+
manifest_path = find_result.stdout.decode("utf-8").strip()
329+
manifest_dict = extract_json_from_clearsig(manifest_path)
330+
return (spec_hash, stack, manifest_dict, manifest_path)
331+
332+
with ThreadPoolExecutor(max_workers=parallel) as executor:
333+
futures = [executor.submit(_process_manifest_fn, *task) for task in task_list]
334+
for future in as_completed(futures):
335+
try:
336+
spec_hash, stack, manifest_dict, manifest_path = future.result()
337+
if not spec_hash or not stack or not manifest_dict or not manifest_path:
338+
continue
339+
340+
stack_specs[stack][spec_hash].stack = stack
341+
stack_specs[stack][spec_hash].manifest_path = manifest_path
342+
stack_specs[stack][spec_hash].meta = format_blob_url(
343+
f"{ref}/{stack}",
344+
find_data_with_media_type(
345+
manifest_dict["data"], SPEC_METADATA_MEDIA_TYPE
346+
),
347+
)
348+
stack_specs[stack][spec_hash].archive = format_blob_url(
349+
f"{ref}/{stack}",
350+
find_data_with_media_type(
351+
manifest_dict["data"], TARBALL_MEDIA_TYPE
352+
),
353+
)
354+
except Exception as exc:
355+
print(f"Exception processing manifests: {exc}")
356+
357+
# Cleanup the tmpdir
358+
shutil.rmtree(tmpdir)
359+
return stack_specs, top_level_specs
360+
361+
152362
################################################################################
153363
# If the cli didn't provide a working directory, we will create (and clean up)
154364
# a temporary directory.
@@ -159,15 +369,26 @@ def get_workdir_context(workdir: Optional[str] = None):
159369
return contextlib.nullcontext(workdir)
160370

161371

372+
listing_file = None
162373
################################################################################
163374
# Given a url and a file path to use for writing, get a recursive listing of
164375
# everything under the prefix defined by the url, and write it to disk using the
165376
# supplied path.
166-
def list_prefix_contents(url: str, output_file: str):
377+
def list_prefix_contents(url: str, output_file: Optional[str] = None, force: bool = False):
167378
list_cmd = ["aws", "s3", "ls", "--recursive", url]
168379

169-
with open(output_file, "w") as f:
170-
subprocess.run(list_cmd, stdout=f, check=True)
380+
# Auto caching of listing file
381+
global listing_file
382+
if not output_file:
383+
if not listing_file:
384+
listing_file = tempfile.mkstemp()
385+
output_file = listing_file
386+
387+
if not os.path.isfile(output_file) or force:
388+
with open(output_file, "w") as f:
389+
subprocess.run(list_cmd, stdout=f, check=True)
390+
391+
return output_file
171392

172393

173394
################################################################################
@@ -303,6 +524,21 @@ def s3_upload_file(file_path: str, bucket: str, prefix: str, client=None):
303524
s3_client.upload_fileobj(fd, bucket, prefix)
304525

305526

527+
def s3_object_exists(bucket: str, key: str, client=None):
528+
"""Check if an s3 object exists"""
529+
530+
if client:
531+
s3_client = client
532+
else:
533+
s3_client = s3_create_client():
534+
535+
try:
536+
_ = s3_client.head_object(Bucket=bucket, Key=key)
537+
return True
538+
except Exception:
539+
return False
540+
541+
306542
################################################################################
307543
#
308544
def compute_checksum(input_file: str, buf_size: int = 65536) -> str:

0 commit comments

Comments
 (0)