Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 70 additions & 6 deletions internal/cephfs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ func (cs *ControllerServer) CreateSnapshot(
info, err := volClient.GetSubVolumeInfo(ctx)
if err != nil {
if sid != nil {
errDefer := store.UndoSnapReservation(ctx, parentVolOptions, *sid, snapName, cr)
errDefer := store.UndoSnapReservation(ctx, parentVolOptions, sid, snapName, cr)
if errDefer != nil {
log.WarningLog(ctx, "failed undoing reservation of snapshot: %s (%s)",
requestName, errDefer)
Expand Down Expand Up @@ -879,13 +879,13 @@ func (cs *ControllerServer) CreateSnapshot(
}

// Reservation
sID, err := store.ReserveSnap(ctx, parentVolOptions, vid.FsSubvolName, cephfsSnap, cr)
sID, err := store.ReserveSnap(ctx, parentVolOptions, vid.FsSubvolName, sourceVolID, cephfsSnap, cr)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer func() {
if err != nil {
errDefer := store.UndoSnapReservation(ctx, parentVolOptions, *sID, snapName, cr)
errDefer := store.UndoSnapReservation(ctx, parentVolOptions, sID, snapName, cr)
if errDefer != nil {
log.WarningLog(ctx, "failed undoing reservation of snapshot: %s (%s)",
requestName, errDefer)
Expand Down Expand Up @@ -1033,7 +1033,7 @@ func (cs *ControllerServer) DeleteSnapshot(
// success as deletion is complete
return &csi.DeleteSnapshotResponse{}, nil
case errors.Is(err, cerrors.ErrSnapNotFound):
err = store.UndoSnapReservation(ctx, volOpt, *sid, sid.RequestName, cr)
err = store.UndoSnapReservation(ctx, volOpt, sid, sid.RequestName, cr)
if err != nil {
log.ErrorLog(ctx, "failed to remove reservation for snapname (%s) with backing snap (%s) (%s)",
sid.RequestName, sid.FsSnapshotName, err)
Expand All @@ -1046,7 +1046,7 @@ func (cs *ControllerServer) DeleteSnapshot(
// if the error is ErrVolumeNotFound, the subvolume is already deleted
// from backend, Hence undo the omap entries and return success
log.ErrorLog(ctx, "Volume not present")
err = store.UndoSnapReservation(ctx, volOpt, *sid, sid.RequestName, cr)
err = store.UndoSnapReservation(ctx, volOpt, sid, sid.RequestName, cr)
if err != nil {
log.ErrorLog(ctx, "failed to remove reservation for snapname (%s) with backing snap (%s) (%s)",
sid.RequestName, sid.FsSnapshotName, err)
Expand Down Expand Up @@ -1113,7 +1113,7 @@ func deleteSnapshotAndUndoReservation(
return err
}

err = store.UndoSnapReservation(ctx, parentVolOptions, *snapID, snapID.RequestName, cr)
err = store.UndoSnapReservation(ctx, parentVolOptions, snapID, snapID.RequestName, cr)
if err != nil {
log.ErrorLog(ctx, "failed to remove reservation for snapname (%s) with backing snap (%s) (%s)",
snapID.RequestName, snapID.RequestName, err)
Expand All @@ -1124,6 +1124,70 @@ func deleteSnapshotAndUndoReservation(
return nil
}

// GetSnapshot returns the information about a snapshot.
func (cs *ControllerServer) GetSnapshot(
ctx context.Context,
req *csi.GetSnapshotRequest,
) (*csi.GetSnapshotResponse, error) {
if err := cs.Driver.ValidateControllerServiceRequest(
csi.ControllerServiceCapability_RPC_GET_SNAPSHOT); err != nil {
log.ErrorLog(ctx, "invalid get snapshot req: %v", protosanitizer.StripSecrets(req))

return nil, err
}

snapshotID := req.GetSnapshotId()
if snapshotID == "" {
return nil, status.Error(codes.InvalidArgument, "snapshot ID cannot be empty")
}

cr, err := util.NewAdminCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
defer cr.DeleteCredentials()

volOpt, snapInfo, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr,
req.GetSecrets(), cs.ClusterName)
if err != nil {
switch {
case errors.Is(err, cerrors.ErrInvalidVolID):
return nil, status.Errorf(codes.NotFound, "snapshot %s not found: %v", snapshotID, err)
case errors.Is(err, util.ErrPoolNotFound):
return nil, status.Errorf(codes.NotFound, "snapshot %s not found: %v", snapshotID, err)
case errors.Is(err, util.ErrKeyNotFound):
return nil, status.Errorf(codes.NotFound, "snapshot %s not found: %v", snapshotID, err)
case errors.Is(err, cerrors.ErrSnapNotFound):
return nil, status.Errorf(codes.NotFound, "snapshot %s not found: %v", snapshotID, err)
case errors.Is(err, cerrors.ErrVolumeNotFound):
return nil, status.Errorf(codes.NotFound, "snapshot %s not found: %v", snapshotID, err)
default:
return nil, status.Error(codes.Internal, err.Error())
}
}
defer volOpt.Destroy()

if sid.SourceVolumeID == "" {
backfilledID, bErr := store.BackfillSourceVolumeID(ctx, volOpt, sid, cr)
if bErr != nil {
return nil, status.Errorf(codes.NotFound,
"snapshot %s does not have source volume ID metadata and backfill failed: %v",
snapshotID, bErr)
}
sid.SourceVolumeID = backfilledID
}

return &csi.GetSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: volOpt.Size,
SnapshotId: snapshotID,
SourceVolumeId: sid.SourceVolumeID,
CreationTime: timestamppb.New(snapInfo.CreatedAt),
ReadyToUse: snapInfo.HasPendingClones != "yes",
},
}, nil
}

// ControllerPublishVolume implements the CSI ControllerPublishVolume RPC.
// It reads the service account restriction metadata from the backing CephFS
// subvolume and passes it to the node via publish context.
Expand Down
1 change: 1 addition & 0 deletions internal/cephfs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (fs *cephfsDriver) Run(conf *util.Config) {
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
csi.ControllerServiceCapability_RPC_GET_SNAPSHOT,
})
fs.cd.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
Expand Down
12 changes: 11 additions & 1 deletion internal/cephfs/store/fsjournal.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type SnapshotIdentifier struct {
RequestName string
CreationTime *timestamp.Timestamp
FsSubvolName string
SourceVolumeID string
}

/*
Expand Down Expand Up @@ -328,6 +329,7 @@ func ReserveSnap(
ctx context.Context,
volOptions *VolumeOptions,
parentSubVolName string,
sourceVolumeID string,
snap *SnapshotOption,
cr *util.Credentials,
) (*SnapshotIdentifier, error) {
Expand All @@ -354,6 +356,13 @@ func ReserveSnap(
return nil, err
}

if sourceVolumeID != "" {
err = j.StoreSourceVolumeID(ctx, volOptions.MetadataPool, imageUUID, sourceVolumeID)
if err != nil {
return nil, err
}
}

// generate the snapshot ID to return to the CO system
vid.SnapshotID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID,
"", volOptions.ClusterID, imageUUID)
Expand All @@ -371,7 +380,7 @@ func ReserveSnap(
func UndoSnapReservation(
ctx context.Context,
volOptions *VolumeOptions,
vid SnapshotIdentifier,
vid *SnapshotIdentifier,
snapName string,
cr *util.Credentials,
) error {
Expand Down Expand Up @@ -428,6 +437,7 @@ func CheckSnapExists(
snapUUID := snapData.ImageUUID
snapID := snapData.ImageAttributes.ImageName
sid.FsSnapshotName = snapData.ImageAttributes.ImageName
sid.SourceVolumeID = snapData.ImageAttributes.SourceVolumeID
snapClient := core.NewSnapshot(volOptions.conn, snapID,
volOptions.ClusterID, clusterName, &volOptions.SubVolume)
snapInfo, err := snapClient.GetSnapshotInfo(ctx)
Expand Down
68 changes: 68 additions & 0 deletions internal/cephfs/store/volumeoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/ceph/ceph-csi/internal/cephfs/core"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
"github.com/ceph/ceph-csi/internal/journal"
kmsapi "github.com/ceph/ceph-csi/internal/kms"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/k8s"
Expand Down Expand Up @@ -882,6 +883,7 @@ func NewSnapshotOptionsFromID(
sid.RequestName = imageAttributes.RequestName
sid.FsSnapshotName = imageAttributes.ImageName
sid.FsSubvolName = imageAttributes.SourceName
sid.SourceVolumeID = imageAttributes.SourceVolumeID

volOptions.SubVolume.VolID = sid.FsSubvolName
volOptions.Owner = imageAttributes.Owner
Expand Down Expand Up @@ -911,6 +913,72 @@ func NewSnapshotOptionsFromID(
return &volOptions, &info, &sid, nil
}

// BackfillSourceVolumeID reconstructs and persists the source CSI volume ID
// for snapshots created before sourceVolumeID was stored in the OMAP.
// It extracts the objectUUID from the parent subvolume name using the default
// naming convention (csi-vol-<UUID>), verifies it via the volume journal,
// composes the CSI volume ID, and persists it for future calls.
func BackfillSourceVolumeID(
ctx context.Context,
volOptions *VolumeOptions,
sid *SnapshotIdentifier,
cr *util.Credentials,
) (string, error) {
objectUUID := strings.TrimPrefix(sid.FsSubvolName, journal.DefaultVolumeNamingPrefix)
if objectUUID == sid.FsSubvolName {
return "", fmt.Errorf(
"cannot extract object UUID from subvolume name %q: custom naming prefix is not supported for backfill",
sid.FsSubvolName)
}

j, err := VolJournal.Connect(volOptions.Monitors, volOptions.RadosNamespace, cr)
if err != nil {
return "", fmt.Errorf("failed to connect to volume journal: %w", err)
}
defer j.Destroy()

imageAttributes, err := j.GetImageAttributes(ctx, volOptions.MetadataPool, objectUUID, false)
if err != nil {
return "", fmt.Errorf("failed to look up parent volume UUID %q in journal: %w", objectUUID, err)
}

if imageAttributes.ImageName != sid.FsSubvolName {
return "", fmt.Errorf("parent volume name mismatch: journal has %q, snapshot references %q",
imageAttributes.ImageName, sid.FsSubvolName)
}

sourceVolumeID, err := util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID,
"", volOptions.ClusterID, objectUUID)
if err != nil {
return "", fmt.Errorf("failed to generate source volume ID: %w", err)
}

// Persist to the snapshot's OMAP so future calls don't need backfill
var vi util.CSIIdentifier
if err = vi.DecomposeCSIID(sid.SnapshotID); err != nil {
log.WarningLog(ctx, "failed to decompose snapshot ID for backfill persist: %v", err)

return sourceVolumeID, nil
}

snapJ, err := SnapJournal.Connect(volOptions.Monitors, volOptions.RadosNamespace, cr)
if err != nil {
log.WarningLog(ctx, "failed to persist backfilled source volume ID: %v", err)

return sourceVolumeID, nil
}
defer snapJ.Destroy()

if err = snapJ.StoreSourceVolumeID(ctx, volOptions.MetadataPool, vi.ObjectUUID, sourceVolumeID); err != nil {
log.WarningLog(ctx, "failed to persist backfilled source volume ID for snapshot %s: %v",
sid.SnapshotID, err)
} else {
log.DebugLog(ctx, "backfilled source volume ID %s for snapshot %s", sourceVolumeID, sid.SnapshotID)
}

return sourceVolumeID, nil
}

// SnapshotOption is a struct that holds the information about the snapshot.
type SnapshotOption struct {
ReservedID string // ID reserved for the snapshot.
Expand Down
2 changes: 2 additions & 0 deletions internal/csi-common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ func getReqID(req any) string {
reqID = r.GetName()
case *csi.DeleteSnapshotRequest:
reqID = r.GetSnapshotId()
case *csi.GetSnapshotRequest:
reqID = r.GetSnapshotId()

case *csi.ControllerExpandVolumeRequest:
reqID = r.GetVolumeId()
Expand Down
29 changes: 29 additions & 0 deletions internal/journal/voljournal.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ single entity modifying the related omaps for a given VolName.
const (
defaultVolumeNamingPrefix string = "csi-vol-"
defaultSnapshotNamingPrefix string = "csi-snap-"

// DefaultVolumeNamingPrefix is the default prefix for volume names.
DefaultVolumeNamingPrefix = defaultVolumeNamingPrefix
)

// CSIJournal defines the interface and the required key names for the above RADOS based OMaps.
Expand Down Expand Up @@ -163,6 +166,9 @@ type Config struct {
// backingSnapshotIDKey ID of the snapshot on which the CephFS snapshot-backed volume is based
backingSnapshotIDKey string

// sourceVolumeIDKey stores the CSI volume ID of the source volume for snapshots
sourceVolumeIDKey string

// commonPrefix is the prefix common to all omap keys for this Config
commonPrefix string
}
Expand All @@ -184,6 +190,7 @@ func NewCSIVolumeJournal(suffix string) *Config {
encryptionType: "csi.volume.encryptionType",
ownerKey: "csi.volume.owner",
backingSnapshotIDKey: "csi.volume.backingsnapshotid",
sourceVolumeIDKey: "",
commonPrefix: "csi.",
}
}
Expand All @@ -204,6 +211,7 @@ func NewCSISnapshotJournal(suffix string) *Config {
encryptKMSKey: "csi.volume.encryptKMS",
encryptionType: "csi.volume.encryptionType",
ownerKey: "csi.volume.owner",
sourceVolumeIDKey: "csi.sourceVolumeId",
commonPrefix: "csi.",
}
}
Expand Down Expand Up @@ -696,6 +704,7 @@ type ImageAttributes struct {
GroupID string // Contains the group id of the image
JournalPoolID int64 // Pool ID of the CSI journal pool, stored in big endian format (on-disk data)
BackingSnapshotID string // ID of the snapshot on which the CephFS snapshot-backed volume is based
SourceVolumeID string // Contains the CSI volume ID of the source volume, if it is a snapshot
}

// GetImageAttributes fetches all keys and their values, from a UUID directory, returning ImageAttributes structure.
Expand Down Expand Up @@ -727,6 +736,7 @@ func (conn *Connection) GetImageAttributes(
cj.ownerKey,
cj.backingSnapshotIDKey,
cj.csiGroupIDKey,
cj.sourceVolumeIDKey,
}
values, err := getOMapValues(
ctx, conn, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID,
Expand Down Expand Up @@ -777,6 +787,10 @@ func (conn *Connection) GetImageAttributes(
return nil, fmt.Errorf("%w: no snap source in omap for %q",
util.ErrKeyNotFound, cj.cephUUIDDirectoryPrefix+objectUUID)
}

if cj.sourceVolumeIDKey != "" {
imageAttributes.SourceVolumeID = values[cj.sourceVolumeIDKey]
}
}

return imageAttributes, nil
Expand Down Expand Up @@ -816,6 +830,21 @@ func (conn *Connection) StoreGroupID(ctx context.Context, pool, reservedUUID, gr
return nil
}

// StoreSourceVolumeID stores the CSI volume ID of the source volume in the snapshot's omap.
func (conn *Connection) StoreSourceVolumeID(ctx context.Context, pool, reservedUUID, sourceVolumeID string) error {
if conn.config.sourceVolumeIDKey == "" {
return errors.New("sourceVolumeIDKey is not set for this journal")
}

err := setOMapKeys(ctx, conn, pool, conn.config.namespace, conn.config.cephUUIDDirectoryPrefix+reservedUUID,
map[string]string{conn.config.sourceVolumeIDKey: sourceVolumeID})
if err != nil {
return fmt.Errorf("failed to store source volume ID: %w", err)
}

return nil
}

// FetchAttribute fetches an attribute (key) in omap.
func (conn *Connection) FetchAttribute(ctx context.Context, pool, reservedUUID, attribute string) (string, error) {
key := conn.config.commonPrefix + attribute
Expand Down
Loading
Loading