diff --git a/docs/docs/concepts/backends.md b/docs/docs/concepts/backends.md index 0213b669d4..5446cef160 100644 --- a/docs/docs/concepts/backends.md +++ b/docs/docs/concepts/backends.md @@ -929,6 +929,34 @@ projects: * `sizes` - read * `ssh_key` - create, read, update,delete +### Crusoe Cloud + +Log into your [Crusoe Cloud](https://console.crusoecloud.com/) console and create an API key +under your account settings. Note your project ID from the project settings page. + +Then, go ahead and configure the backend: + +
+ +```yaml +projects: +- name: main + backends: + - type: crusoe + project_id: your-project-id + creds: + type: access_key + access_key: your-access-key + secret_key: your-secret-key + regions: + - us-east1-a + - us-southcentral1-a +``` + +
+ +`regions` is optional. If not specified, all available Crusoe regions are used. + ### Hot Aisle Log in to the SSH TUI as described in the [Hot Aisle Quick Start](https://hotaisle.xyz/quick-start/). diff --git a/docs/docs/reference/server/config.yml.md b/docs/docs/reference/server/config.yml.md index 26c01d73e2..80e48b028e 100644 --- a/docs/docs/reference/server/config.yml.md +++ b/docs/docs/reference/server/config.yml.md @@ -335,6 +335,23 @@ to configure [backends](../../concepts/backends.md) and other [server-level sett type: required: true +##### `projects[n].backends[type=crusoe]` { #crusoe data-toc-label="crusoe" } + +#SCHEMA# dstack._internal.core.backends.crusoe.models.CrusoeBackendConfigWithCreds + overrides: + show_root_heading: false + type: + required: true + item_id_prefix: crusoe- + +###### `projects[n].backends[type=crusoe].creds` { #crusoe-creds data-toc-label="creds" } + +#SCHEMA# dstack._internal.core.backends.crusoe.models.CrusoeAccessKeyCreds + overrides: + show_root_heading: false + type: + required: true + ##### `projects[n].backends[type=hotaisle]` { #hotaisle data-toc-label="hotaisle" } #SCHEMA# dstack._internal.core.backends.hotaisle.models.HotAisleBackendConfigWithCreds diff --git a/frontend/src/types/backend.d.ts b/frontend/src/types/backend.d.ts index b41d796cee..dfe74648d4 100644 --- a/frontend/src/types/backend.d.ts +++ b/frontend/src/types/backend.d.ts @@ -1,6 +1,7 @@ declare type TBackendType = | 'aws' | 'azure' + | 'crusoe' | 'cudo' | 'datacrunch' | 'dstack' diff --git a/pyproject.toml b/pyproject.toml index 8e45dde6bd..3336fc5423 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ dependencies = [ "python-multipart>=0.0.16", "filelock", "psutil", - "gpuhunt==0.1.16", + "gpuhunt==0.1.17", "argcomplete>=3.5.0", "ignore-python>=0.2.0", "orjson", @@ -256,6 +256,9 @@ fluentbit = [ "elasticsearch>=8.0.0", "dstack[server]", ] +crusoe = [ + "dstack[server]", +] all = [ - "dstack[gateway,server,aws,azure,gcp,verda,kubernetes,lambda,nebius,oci,fluentbit]", + "dstack[gateway,server,aws,azure,gcp,verda,kubernetes,lambda,nebius,oci,crusoe,fluentbit]", ] diff --git a/src/dstack/_internal/core/backends/configurators.py b/src/dstack/_internal/core/backends/configurators.py index ec7f976c53..75a4a86abb 100644 --- a/src/dstack/_internal/core/backends/configurators.py +++ b/src/dstack/_internal/core/backends/configurators.py @@ -35,6 +35,15 @@ except ImportError: pass +try: + from dstack._internal.core.backends.crusoe.configurator import ( + CrusoeConfigurator, + ) + + _CONFIGURATOR_CLASSES.append(CrusoeConfigurator) +except ImportError: + pass + try: from dstack._internal.core.backends.cudo.configurator import ( CudoConfigurator, diff --git a/src/dstack/_internal/core/backends/crusoe/backend.py b/src/dstack/_internal/core/backends/crusoe/backend.py new file mode 100644 index 0000000000..9f81f136d1 --- /dev/null +++ b/src/dstack/_internal/core/backends/crusoe/backend.py @@ -0,0 +1,16 @@ +from dstack._internal.core.backends.base.backend import Backend +from dstack._internal.core.backends.crusoe.compute import CrusoeCompute +from dstack._internal.core.backends.crusoe.models import CrusoeConfig +from dstack._internal.core.models.backends.base import BackendType + + +class CrusoeBackend(Backend): + TYPE = BackendType.CRUSOE + COMPUTE_CLASS = CrusoeCompute + + def __init__(self, config: CrusoeConfig): + self.config = config + self._compute = CrusoeCompute(self.config) + + def compute(self) -> CrusoeCompute: + return self._compute diff --git a/src/dstack/_internal/core/backends/crusoe/compute.py b/src/dstack/_internal/core/backends/crusoe/compute.py new file mode 100644 index 0000000000..10ede96776 --- /dev/null +++ b/src/dstack/_internal/core/backends/crusoe/compute.py @@ -0,0 +1,436 @@ +from collections.abc import Iterable +from typing import List, Optional + +import gpuhunt +from gpuhunt.providers.crusoe import CrusoeProvider + +from dstack._internal.core.backends.base.backend import Compute +from dstack._internal.core.backends.base.compute import ( + ComputeWithAllOffersCached, + ComputeWithCreateInstanceSupport, + ComputeWithMultinodeSupport, + ComputeWithPlacementGroupSupport, + ComputeWithPrivilegedSupport, + generate_unique_instance_name, + get_shim_commands, +) +from dstack._internal.core.backends.base.offers import ( + OfferModifier, + get_catalog_offers, + get_offers_disk_modifier, +) +from dstack._internal.core.backends.crusoe.models import CrusoeConfig +from dstack._internal.core.backends.crusoe.resources import CrusoeClient +from dstack._internal.core.errors import BackendError, NotYetTerminated +from dstack._internal.core.models.backends.base import BackendType +from dstack._internal.core.models.common import CoreModel +from dstack._internal.core.models.instances import ( + InstanceAvailability, + InstanceConfiguration, + InstanceOffer, + InstanceOfferWithAvailability, +) +from dstack._internal.core.models.placement import ( + PlacementGroup, + PlacementGroupProvisioningData, + PlacementStrategy, +) +from dstack._internal.core.models.resources import Memory, Range +from dstack._internal.core.models.runs import JobProvisioningData, Requirements +from dstack._internal.utils.logging import get_logger + +logger = get_logger(__name__) + +# Range for the persistent data disk created for instance types without ephemeral NVMe. +CONFIGURABLE_DISK_SIZE = Range[Memory]( + min=Memory.parse("50GB"), + max=Memory.parse("5000GB"), +) +WAIT_FOR_DISK_TIMEOUT = 30 +WAIT_FOR_VM_TIMEOUT = 120 + +SETUP_COMMANDS = [ + 'sed -i "s/.*AllowTcpForwarding.*/AllowTcpForwarding yes/g" /etc/ssh/sshd_config', + "service ssh restart", +] + +# Set up storage on the best available disk and move containerd there. +# Docker on Crusoe images delegates image storage to containerd's native snapshotter, +# so /var/lib/containerd is what determines container disk space. +# Handles: /dev/vdb (persistent data disk we create) or /dev/nvme* (ephemeral NVMe). +# For multiple NVMe drives, uses mdadm RAID-0 for maximum space. +STORAGE_SETUP_COMMANDS = [ + ( + "DISK='' && " + "if [ -b /dev/vdb ]; then DISK=/dev/vdb; " + "elif ls /dev/nvme*n1 >/dev/null 2>&1; then" + " NVME_DEVS=$(ls /dev/nvme*n1 2>/dev/null);" + " NVME_COUNT=$(echo $NVME_DEVS | wc -w);" + " if [ $NVME_COUNT -eq 1 ]; then DISK=$NVME_DEVS;" + " elif [ $NVME_COUNT -gt 1 ]; then" + " apt-get install -y -qq mdadm >/dev/null 2>&1 || true;" + " mdadm --create /dev/md0 --level=0 --raid-devices=$NVME_COUNT $NVME_DEVS --force --run;" + " DISK=/dev/md0;" + " fi;" + "fi && " + 'if [ -n "$DISK" ]; then' + " mkfs.ext4 -q -F $DISK" + " && mkdir -p /data" + " && mount $DISK /data" + " && service docker stop" + " && systemctl stop containerd || true" + " && mkdir -p /data/containerd" + " && rsync -a /var/lib/containerd/ /data/containerd/" + " && mount --bind /data/containerd /var/lib/containerd" + " && systemctl start containerd || true" + " && service docker start" + "; fi" + ), +] + +IMAGE_SXM_DOCKER = "ubuntu22.04-nvidia-sxm-docker:latest" +IMAGE_PCIE_DOCKER = "ubuntu22.04-nvidia-pcie-docker:latest" +IMAGE_ROCM = "ubuntu-rocm:latest" +IMAGE_BASE = "ubuntu22.04:latest" + + +def _get_image(instance_name: str, gpu_type: str) -> str: + if not gpu_type: + return IMAGE_BASE + # Check instance name for SXM -- gpu_type from gpuhunt is normalized (e.g. "A100") + # and doesn't contain "SXM", but instance names like "a100-80gb-sxm-ib.8x" do. + if "-sxm" in instance_name.lower(): + return IMAGE_SXM_DOCKER + if "MI3" in gpu_type: + return IMAGE_ROCM + return IMAGE_PCIE_DOCKER + + +def _is_ib_type(instance_name: str) -> bool: + prefix = instance_name.split(".")[0] + return prefix.endswith("-ib") or prefix.endswith("-roce") + + +def _get_instance_family(instance_name: str) -> str: + return instance_name.rsplit(".", 1)[0] + + +def _has_ephemeral_disk(offer: InstanceOffer) -> bool: + """Check if the instance type has ephemeral NVMe storage via gpuhunt provider_data.""" + backend_data = offer.backend_data or {} + return backend_data.get("disk_gb", 0) > 0 + + +class CrusoeCompute( + ComputeWithAllOffersCached, + ComputeWithCreateInstanceSupport, + ComputeWithPrivilegedSupport, + ComputeWithMultinodeSupport, + ComputeWithPlacementGroupSupport, + Compute, +): + def __init__(self, config: CrusoeConfig): + super().__init__() + self.config = config + self._client = CrusoeClient(config.creds, config.project_id) + self._catalog = gpuhunt.Catalog(balance_resources=False, auto_reload=False) + self._catalog.add_provider( + CrusoeProvider( + access_key=config.creds.access_key, + secret_key=config.creds.secret_key, + project_id=config.project_id, + ) + ) + + def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability]: + offers = get_catalog_offers( + backend=BackendType.CRUSOE, + locations=self.config.regions or None, + catalog=self._catalog, + ) + quota_map = self._get_quota_map() + result = [] + for offer in offers: + family = _get_instance_family(offer.instance.name) + availability = InstanceAvailability.UNKNOWN + for prog_name, available in quota_map.items(): + if family.startswith(prog_name) or prog_name.startswith(family): + availability = ( + InstanceAvailability.AVAILABLE + if available > 0 + else InstanceAvailability.NO_QUOTA + ) + break + result.append( + InstanceOfferWithAvailability( + **offer.dict(), + availability=availability, + ) + ) + return result + + def _get_quota_map(self) -> dict[str, int]: + try: + quotas = self._client.list_quotas() + except Exception: + logger.warning("Failed to fetch Crusoe quotas, availability will be UNKNOWN") + return {} + result = {} + for q in quotas: + prog_name = q.get("programmatic_name", "") + available = q.get("available", 0) + category = q.get("category", "") + if "Instance" in category: + result[prog_name] = available + return result + + def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: + # Only adjust disk size for types without ephemeral NVMe (disk_gb == 0). + # Types with ephemeral NVMe already have their disk_size set by gpuhunt. + base_modifier = get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements) + + def modifier( + offer: InstanceOfferWithAvailability, + ) -> Optional[InstanceOfferWithAvailability]: + if _has_ephemeral_disk(offer): + return offer + return base_modifier(offer) + + return [modifier] + + def create_instance( + self, + instance_offer: InstanceOfferWithAvailability, + instance_config: InstanceConfiguration, + placement_group: Optional[PlacementGroup], + ) -> JobProvisioningData: + instance_name = generate_unique_instance_name(instance_config) + region = instance_offer.region + + ib_partition_id = None + if placement_group: + assert placement_group.provisioning_data is not None + pg_data = CrusoePlacementGroupBackendData.load( + placement_group.provisioning_data.backend_data + ) + ib_partition_id = pg_data.ib_partition_id + + gpus = instance_offer.instance.resources.gpus + gpu_type = gpus[0].name if gpus else "" + instance_type_name = instance_offer.instance.name + image = _get_image(instance_type_name, gpu_type) + + needs_data_disk = not _has_ephemeral_disk(instance_offer) + # Always include storage setup: it auto-detects /dev/vdb (data disk) or + # /dev/nvme* (ephemeral NVMe) and moves containerd storage there. + commands = SETUP_COMMANDS + STORAGE_SETUP_COMMANDS + get_shim_commands(is_privileged=True) + startup_script = "#!/bin/bash\nset -e\n" + " && ".join(commands) + + data_disk_id = None + create_op = None + try: + if needs_data_disk: + disk_size_mib = instance_offer.instance.resources.disk.size_mib + disk_size_gib = max(disk_size_mib // 1024, 1) + disk_op = self._client.create_disk( + name=f"{instance_name}-data", + size=f"{disk_size_gib}GiB", + location=region, + ) + data_disk_id = disk_op["metadata"]["id"] + self._client.wait_for_disk_operation( + disk_op["operation_id"], timeout=WAIT_FOR_DISK_TIMEOUT + ) + + disks = None + if data_disk_id: + disks = [ + {"disk_id": data_disk_id, "mode": "read-write", "attachment_type": "data"} + ] + + host_channel_adapters = None + if ib_partition_id: + host_channel_adapters = [{"ib_partition_id": ib_partition_id}] + + create_op = self._client.create_vm( + name=instance_name, + vm_type=instance_type_name, + location=region, + ssh_public_key=instance_config.get_public_keys()[0], + image=image, + startup_script=startup_script, + disks=disks, + host_channel_adapters=host_channel_adapters, + ) + vm_id = create_op["metadata"]["id"] + self._client.wait_for_vm_operation( + create_op["operation_id"], timeout=WAIT_FOR_VM_TIMEOUT + ) + except BaseException: + if create_op is not None: + vm_id_to_delete = create_op.get("metadata", {}).get("id") + if vm_id_to_delete: + try: + self._client.delete_vm(vm_id_to_delete) + except Exception as e: + logger.exception("Could not delete VM %s: %s", vm_id_to_delete, e) + if data_disk_id: + try: + self._client.delete_disk(data_disk_id) + except Exception as e: + logger.exception("Could not delete disk %s: %s", data_disk_id, e) + raise + + return JobProvisioningData( + backend=instance_offer.backend, + instance_type=instance_offer.instance, + instance_id=vm_id, + hostname=None, + region=region, + price=instance_offer.price, + ssh_port=22, + username="ubuntu", + dockerized=True, + backend_data=CrusoeInstanceBackendData(data_disk_id=data_disk_id).json(), + ) + + def update_provisioning_data( + self, provisioning_data, project_ssh_public_key, project_ssh_private_key + ): + try: + vm = self._client.get_vm(provisioning_data.instance_id) + except Exception: + return + interfaces = vm.get("network_interfaces", []) + if not interfaces: + return + ips = interfaces[0].get("ips", []) + if not ips: + return + public_ipv4 = ips[0].get("public_ipv4", {}) + private_ipv4 = ips[0].get("private_ipv4", {}) + if public_ipv4.get("address"): + provisioning_data.hostname = public_ipv4["address"] + if private_ipv4.get("address"): + provisioning_data.internal_ip = private_ipv4["address"] + + def terminate_instance( + self, instance_id: str, region: str, backend_data: Optional[str] = None + ): + backend_data_parsed = CrusoeInstanceBackendData.load(backend_data) + try: + vm = self._client.get_vm(instance_id) + except BackendError: + # VM not found (404) or other API error -- treat as already deleted + vm = None + + if vm is not None: + state = vm.get("state", "") + if state not in ("STATE_DELETING", "STATE_DELETED"): + try: + self._client.delete_vm(instance_id) + except BackendError: + pass + raise NotYetTerminated(f"Requested VM deletion. State was: {state}") + else: + raise NotYetTerminated(f"Waiting for VM deletion. State: {state}") + + # OS disk is auto-deleted with the VM. Data disk must be deleted separately. + if backend_data_parsed.data_disk_id: + try: + self._client.delete_disk(backend_data_parsed.data_disk_id) + except BackendError: + pass + + def create_placement_group( + self, + placement_group: PlacementGroup, + master_instance_offer: InstanceOffer, + ) -> PlacementGroupProvisioningData: + assert placement_group.configuration.placement_strategy == PlacementStrategy.CLUSTER + instance_name = master_instance_offer.instance.name + region = placement_group.configuration.region + + if not _is_ib_type(instance_name): + return PlacementGroupProvisioningData( + backend=BackendType.CRUSOE, + backend_data=CrusoePlacementGroupBackendData( + ib_partition_id=None, ib_network_id=None + ).json(), + ) + + ib_networks = self._client.list_ib_networks() + target_network = None + for net in ib_networks: + if net.get("location") != region: + continue + for cap in net.get("capacities", []): + if cap.get("slice_type") == instance_name: + target_network = net + break + if target_network: + break + + if target_network is None: + raise BackendError( + f"No IB network found in {region} for instance type {instance_name}" + ) + + partition = self._client.create_ib_partition( + name=placement_group.name, + ib_network_id=target_network["id"], + ) + return PlacementGroupProvisioningData( + backend=BackendType.CRUSOE, + backend_data=CrusoePlacementGroupBackendData( + ib_partition_id=partition["id"], + ib_network_id=target_network["id"], + ).json(), + ) + + def delete_placement_group(self, placement_group: PlacementGroup) -> None: + assert placement_group.provisioning_data is not None + pg_data = CrusoePlacementGroupBackendData.load( + placement_group.provisioning_data.backend_data + ) + if pg_data.ib_partition_id: + try: + self._client.delete_ib_partition(pg_data.ib_partition_id) + except BackendError: + pass + + def is_suitable_placement_group( + self, + placement_group: PlacementGroup, + instance_offer: InstanceOffer, + ) -> bool: + if placement_group.configuration.region != instance_offer.region: + return False + assert placement_group.provisioning_data is not None + pg_data = CrusoePlacementGroupBackendData.load( + placement_group.provisioning_data.backend_data + ) + if pg_data.ib_partition_id is None: + return not _is_ib_type(instance_offer.instance.name) + return _is_ib_type(instance_offer.instance.name) + + +class CrusoeInstanceBackendData(CoreModel): + data_disk_id: Optional[str] = None + + @classmethod + def load(cls, raw: Optional[str]) -> "CrusoeInstanceBackendData": + if raw is None: + return cls() + return cls.__response__.parse_raw(raw) + + +class CrusoePlacementGroupBackendData(CoreModel): + ib_partition_id: Optional[str] = None + ib_network_id: Optional[str] = None + + @classmethod + def load(cls, raw: Optional[str]) -> "CrusoePlacementGroupBackendData": + if raw is None: + return cls() + return cls.__response__.parse_raw(raw) diff --git a/src/dstack/_internal/core/backends/crusoe/configurator.py b/src/dstack/_internal/core/backends/crusoe/configurator.py new file mode 100644 index 0000000000..95f805458e --- /dev/null +++ b/src/dstack/_internal/core/backends/crusoe/configurator.py @@ -0,0 +1,78 @@ +import json + +from dstack._internal.core.backends.base.configurator import ( + BackendRecord, + Configurator, + raise_invalid_credentials_error, +) +from dstack._internal.core.backends.crusoe.backend import CrusoeBackend +from dstack._internal.core.backends.crusoe.models import ( + CrusoeBackendConfig, + CrusoeBackendConfigWithCreds, + CrusoeConfig, + CrusoeCreds, + CrusoeStoredConfig, +) +from dstack._internal.core.backends.crusoe.resources import CrusoeClient +from dstack._internal.core.models.backends.base import BackendType + + +class CrusoeConfigurator( + Configurator[ + CrusoeBackendConfig, + CrusoeBackendConfigWithCreds, + ] +): + TYPE = BackendType.CRUSOE + BACKEND_CLASS = CrusoeBackend + + def validate_config(self, config: CrusoeBackendConfigWithCreds, default_creds_enabled: bool): + try: + client = CrusoeClient(config.creds, config.project_id) + client.list_quotas() + except Exception as e: + raise_invalid_credentials_error( + fields=[["creds"]], + details=str(e), + ) + if config.regions: + try: + available = set(client.list_locations()) + except Exception: + return + invalid = set(config.regions) - available + if invalid: + raise_invalid_credentials_error( + fields=[["regions"]], + details=( + f"Unknown regions: {sorted(invalid)}. Valid regions: {sorted(available)}" + ), + ) + + def create_backend( + self, project_name: str, config: CrusoeBackendConfigWithCreds + ) -> BackendRecord: + return BackendRecord( + config=CrusoeStoredConfig( + **CrusoeBackendConfig.__response__.parse_obj(config).dict() + ).json(), + auth=CrusoeCreds.parse_obj(config.creds).json(), + ) + + def get_backend_config_with_creds(self, record: BackendRecord) -> CrusoeBackendConfigWithCreds: + config = self._get_config(record) + return CrusoeBackendConfigWithCreds.__response__.parse_obj(config) + + def get_backend_config_without_creds(self, record: BackendRecord) -> CrusoeBackendConfig: + config = self._get_config(record) + return CrusoeBackendConfig.__response__.parse_obj(config) + + def get_backend(self, record: BackendRecord) -> CrusoeBackend: + config = self._get_config(record) + return CrusoeBackend(config=config) + + def _get_config(self, record: BackendRecord) -> CrusoeConfig: + return CrusoeConfig.__response__( + **json.loads(record.config), + creds=CrusoeCreds.parse_raw(record.auth), + ) diff --git a/src/dstack/_internal/core/backends/crusoe/models.py b/src/dstack/_internal/core/backends/crusoe/models.py new file mode 100644 index 0000000000..f405eca1b7 --- /dev/null +++ b/src/dstack/_internal/core/backends/crusoe/models.py @@ -0,0 +1,48 @@ +from typing import Annotated, List, Literal, Optional, Union + +from pydantic import Field + +from dstack._internal.core.models.common import CoreModel + + +class CrusoeAccessKeyCreds(CoreModel): + type: Annotated[Literal["access_key"], Field(description="The type of credentials")] = ( + "access_key" + ) + access_key: Annotated[str, Field(description="The Crusoe API access key")] + secret_key: Annotated[str, Field(description="The Crusoe API secret key")] + + +AnyCrusoeCreds = CrusoeAccessKeyCreds +CrusoeCreds = AnyCrusoeCreds + + +class CrusoeBackendConfig(CoreModel): + type: Annotated[ + Literal["crusoe"], + Field(description="The type of backend"), + ] = "crusoe" + project_id: Annotated[str, Field(description="The Crusoe Cloud project ID")] + regions: Annotated[ + Optional[List[str]], + Field(description="The list of allowed Crusoe regions. Omit to use all regions"), + ] = None + + +class CrusoeBackendConfigWithCreds(CrusoeBackendConfig): + creds: Annotated[AnyCrusoeCreds, Field(description="The credentials")] + + +AnyCrusoeBackendConfig = Union[CrusoeBackendConfig, CrusoeBackendConfigWithCreds] + + +class CrusoeBackendFileConfigWithCreds(CrusoeBackendConfig): + creds: Annotated[AnyCrusoeCreds, Field(description="The credentials")] + + +class CrusoeStoredConfig(CrusoeBackendConfig): + pass + + +class CrusoeConfig(CrusoeStoredConfig): + creds: AnyCrusoeCreds diff --git a/src/dstack/_internal/core/backends/crusoe/resources.py b/src/dstack/_internal/core/backends/crusoe/resources.py new file mode 100644 index 0000000000..1f84ff4019 --- /dev/null +++ b/src/dstack/_internal/core/backends/crusoe/resources.py @@ -0,0 +1,198 @@ +import base64 +import datetime +import hashlib +import hmac +import time +from typing import Any, Dict, List, Optional + +import requests + +from dstack._internal.core.backends.crusoe.models import CrusoeAccessKeyCreds +from dstack._internal.core.errors import BackendError, NoCapacityError, ProvisioningError +from dstack._internal.utils.logging import get_logger + +logger = get_logger(__name__) + +API_URL = "https://api.crusoecloud.com" +API_VERSION = "/v1alpha5" +SIGNATURE_VERSION = "1.0" +REQUEST_TIMEOUT = 30 + + +class CrusoeClient: + def __init__(self, creds: CrusoeAccessKeyCreds, project_id: str): + self.access_key = creds.access_key + self.secret_key = creds.secret_key + self.project_id = project_id + + def _request( + self, + method: str, + path: str, + params: Optional[dict] = None, + body: Optional[dict] = None, + ) -> requests.Response: + dt = str(datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0)) + dt = dt.replace(" ", "T") + + query_string = "" + if params: + query_string = "&".join(f"{k}={v}" for k, v in sorted(params.items())) + + payload = f"{API_VERSION}{path}\n{query_string}\n{method}\n{dt}\n" + + decoded_secret = base64.urlsafe_b64decode( + self.secret_key + "=" * (-len(self.secret_key) % 4) + ) + sig = hmac.new(decoded_secret, msg=payload.encode("ascii"), digestmod=hashlib.sha256) + encoded_sig = base64.urlsafe_b64encode(sig.digest()).decode("ascii").rstrip("=") + + headers = { + "X-Crusoe-Timestamp": dt, + "Authorization": f"Bearer {SIGNATURE_VERSION}:{self.access_key}:{encoded_sig}", + } + if body is not None: + headers["Content-Type"] = "application/json" + + url = f"{API_URL}{API_VERSION}{path}" + resp = requests.request( + method, url, headers=headers, params=params, json=body, timeout=REQUEST_TIMEOUT + ) + if resp.status_code >= 400: + _raise_api_error(resp) + return resp + + def _project_path(self, path: str) -> str: + return f"/projects/{self.project_id}{path}" + + # --- VM operations --- + + def create_vm( + self, + name: str, + vm_type: str, + location: str, + ssh_public_key: str, + image: str, + startup_script: str, + disks: Optional[List[Dict[str, str]]] = None, + host_channel_adapters: Optional[List[Dict[str, str]]] = None, + ) -> dict: + body: Dict[str, Any] = { + "name": name, + "type": vm_type, + "location": location, + "ssh_public_key": ssh_public_key, + "image": image, + "startup_script": startup_script, + } + if disks: + body["disks"] = disks + if host_channel_adapters: + body["host_channel_adapters"] = host_channel_adapters + resp = self._request("POST", self._project_path("/compute/vms/instances"), body=body) + return resp.json()["operation"] + + def get_vm(self, vm_id: str) -> dict: + resp = self._request("GET", self._project_path(f"/compute/vms/instances/{vm_id}")) + return resp.json() + + def delete_vm(self, vm_id: str) -> dict: + resp = self._request("DELETE", self._project_path(f"/compute/vms/instances/{vm_id}")) + return resp.json()["operation"] + + def get_vm_operation(self, operation_id: str) -> dict: + resp = self._request( + "GET", self._project_path(f"/compute/vms/instances/operations/{operation_id}") + ) + return resp.json() + + # --- Disk operations --- + + def create_disk(self, name: str, size: str, location: str) -> dict: + body = { + "name": name, + "size": size, + "location": location, + "type": "persistent-ssd", + "block_size": 4096, + } + resp = self._request("POST", self._project_path("/storage/disks"), body=body) + return resp.json()["operation"] + + def delete_disk(self, disk_id: str) -> dict: + resp = self._request("DELETE", self._project_path(f"/storage/disks/{disk_id}")) + return resp.json()["operation"] + + def get_disk_operation(self, operation_id: str) -> dict: + resp = self._request( + "GET", self._project_path(f"/storage/disks/operations/{operation_id}") + ) + return resp.json() + + # --- Quota operations --- + + def list_quotas(self) -> List[dict]: + resp = self._request("GET", self._project_path("/quotas")) + return resp.json().get("quotas", []) + + # --- Location operations --- + + def list_locations(self) -> List[str]: + resp = self._request("GET", "/locations") + return resp.json().get("items", []) + + # --- IB operations --- + + def list_ib_networks(self) -> List[dict]: + resp = self._request("GET", self._project_path("/networking/ib-networks")) + return resp.json().get("items", []) + + def create_ib_partition(self, name: str, ib_network_id: str) -> dict: + body = {"name": name, "ib_network_id": ib_network_id} + resp = self._request("POST", self._project_path("/networking/ib-partitions"), body=body) + return resp.json() + + def delete_ib_partition(self, partition_id: str) -> None: + self._request("DELETE", self._project_path(f"/networking/ib-partitions/{partition_id}")) + + # --- Operation polling --- + + def wait_for_vm_operation( + self, operation_id: str, timeout: float = 120, interval: float = 5 + ) -> dict: + return self._wait_for_operation(operation_id, self.get_vm_operation, timeout, interval) + + def wait_for_disk_operation( + self, operation_id: str, timeout: float = 30, interval: float = 2 + ) -> dict: + return self._wait_for_operation(operation_id, self.get_disk_operation, timeout, interval) + + def _wait_for_operation(self, operation_id, get_fn, timeout, interval) -> dict: + deadline = time.monotonic() + timeout + while True: + op = get_fn(operation_id) + state = op.get("state", op.get("operation", {}).get("state")) + if state == "SUCCEEDED": + return op + if state == "FAILED": + result = op.get("result", {}) + code = result.get("code", "") + message = result.get("message", str(result)) + if code == "out_of_stock": + raise NoCapacityError(message) + raise ProvisioningError(f"Operation {operation_id} failed: {message}") + if time.monotonic() + interval > deadline: + raise BackendError(f"Operation {operation_id} timed out (state: {state})") + time.sleep(interval) + + +def _raise_api_error(resp: requests.Response) -> None: + try: + data = resp.json() + message = data.get("message", data.get("error", str(data))) + except Exception: + message = resp.text[:500] + if resp.status_code == 404: + raise BackendError(f"Resource not found: {message}") + raise BackendError(f"Crusoe API error ({resp.status_code}): {message}") diff --git a/src/dstack/_internal/core/backends/models.py b/src/dstack/_internal/core/backends/models.py index f1c59e2f44..36a7856e38 100644 --- a/src/dstack/_internal/core/backends/models.py +++ b/src/dstack/_internal/core/backends/models.py @@ -12,6 +12,11 @@ CloudRiftBackendConfig, CloudRiftBackendConfigWithCreds, ) +from dstack._internal.core.backends.crusoe.models import ( + CrusoeBackendConfig, + CrusoeBackendConfigWithCreds, + CrusoeBackendFileConfigWithCreds, +) from dstack._internal.core.backends.cudo.models import ( CudoBackendConfig, CudoBackendConfigWithCreds, @@ -79,6 +84,7 @@ AWSBackendConfig, AzureBackendConfig, CloudRiftBackendConfig, + CrusoeBackendConfig, CudoBackendConfig, BaseDigitalOceanBackendConfig, GCPBackendConfig, @@ -103,6 +109,7 @@ AWSBackendConfigWithCreds, AzureBackendConfigWithCreds, CloudRiftBackendConfigWithCreds, + CrusoeBackendConfigWithCreds, CudoBackendConfigWithCreds, VerdaBackendConfigWithCreds, BaseDigitalOceanBackendConfigWithCreds, @@ -126,6 +133,7 @@ AWSBackendConfigWithCreds, AzureBackendConfigWithCreds, CloudRiftBackendConfigWithCreds, + CrusoeBackendFileConfigWithCreds, CudoBackendConfigWithCreds, VerdaBackendConfigWithCreds, BaseDigitalOceanBackendConfigWithCreds, diff --git a/src/dstack/_internal/core/models/backends/base.py b/src/dstack/_internal/core/models/backends/base.py index 82efe09efa..47552010d4 100644 --- a/src/dstack/_internal/core/models/backends/base.py +++ b/src/dstack/_internal/core/models/backends/base.py @@ -8,6 +8,7 @@ class BackendType(str, enum.Enum): AWS (BackendType): Amazon Web Services AZURE (BackendType): Microsoft Azure CLOUDRIFT (BackendType): CloudRift + CRUSOE (BackendType): Crusoe Cloud CUDO (BackendType): Cudo DATACRUNCH (BackendType): DataCrunch (for backward compatibility) DIGITALOCEAN (BackendType): DigitalOcean @@ -29,6 +30,7 @@ class BackendType(str, enum.Enum): AWS = "aws" AZURE = "azure" CLOUDRIFT = "cloudrift" + CRUSOE = "crusoe" CUDO = "cudo" DATACRUNCH = "datacrunch" # BackendType for backward compatibility DIGITALOCEAN = "digitalocean" diff --git a/src/tests/_internal/server/routers/test_backends.py b/src/tests/_internal/server/routers/test_backends.py index 433c12de30..66c7f4ea36 100644 --- a/src/tests/_internal/server/routers/test_backends.py +++ b/src/tests/_internal/server/routers/test_backends.py @@ -87,6 +87,7 @@ async def test_returns_backend_types(self, client: AsyncClient): "aws", "azure", "cloudrift", + "crusoe", "cudo", *(["datacrunch"] if sys.version_info >= (3, 10) else []), "digitalocean", diff --git a/src/tests/_internal/server/services/test_backend_configs.py b/src/tests/_internal/server/services/test_backend_configs.py index 9ac2e3bbd8..455b38c6e4 100644 --- a/src/tests/_internal/server/services/test_backend_configs.py +++ b/src/tests/_internal/server/services/test_backend_configs.py @@ -13,6 +13,47 @@ ) +class TestCrusoeBackendConfig: + def test_config_parsing(self, tmp_path: Path): + config_yaml_path = tmp_path / "config.yml" + config_dict = { + "projects": [ + { + "name": "main", + "backends": [ + { + "type": "crusoe", + "project_id": "test-project-id", + "regions": ["us-east1-a"], + "creds": { + "type": "access_key", + "access_key": "test-access-key", + "secret_key": "test-secret-key", + }, + } + ], + } + ] + } + config_yaml_path.write_text(yaml.dump(config_dict)) + + with patch.object(settings, "SERVER_CONFIG_FILE_PATH", config_yaml_path): + m = ServerConfigManager() + assert m.load_config() + assert m.config is not None + assert m.config.projects is not None + assert len(m.config.projects) > 0 + assert m.config.projects[0].backends is not None + backend_file_cfg = m.config.projects[0].backends[0] + backend_cfg = file_config_to_config(backend_file_cfg) + + assert backend_cfg.type == "crusoe" + assert backend_cfg.project_id == "test-project-id" + assert backend_cfg.regions == ["us-east1-a"] + assert backend_cfg.creds.access_key == "test-access-key" + assert backend_cfg.creds.secret_key == "test-secret-key" + + @pytest.mark.skipif(sys.version_info < (3, 10), reason="Nebius requires Python 3.10") class TestNebiusBackendConfig: def test_with_filename(self, tmp_path: Path):