Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions internal/cephfs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,24 @@ func (cs *ControllerServer) CreateVolume(
defer parentVol.Destroy()
}

// Lock the source volume or snapshot to prevent concurrent operations
if pvID != nil {
if acquired := cs.VolumeLocks.TryAcquire(pvID.VolumeID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, pvID.VolumeID)

return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, pvID.VolumeID)
}
defer cs.VolumeLocks.Release(pvID.VolumeID)
}
if sID != nil {
if acquired := cs.SnapshotLocks.TryAcquire(sID.SnapshotID); !acquired {
log.ErrorLog(ctx, util.SnapshotOperationAlreadyExistsFmt, sID.SnapshotID)

return nil, status.Errorf(codes.Aborted, util.SnapshotOperationAlreadyExistsFmt, sID.SnapshotID)
}
defer cs.SnapshotLocks.Release(sID.SnapshotID)
}

err = checkValidCreateVolumeRequest(volOptions, parentVol, pvID, sID, req)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
Expand Down
27 changes: 20 additions & 7 deletions internal/rbd/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ func checkValidCreateVolumeRequest(rbdVol, parentVol *rbdVolume, rbdSnap *rbdSna
}

// CreateVolume creates the volume in backend.
//
//nolint:gocyclo,cyclop // TODO: reduce complexity.
func (cs *ControllerServer) CreateVolume(
ctx context.Context,
req *csi.CreateVolumeRequest,
Expand Down Expand Up @@ -394,6 +396,24 @@ func (cs *ControllerServer) CreateVolume(
defer rbdSnap.Destroy(ctx)
}

// Lock the source volume or snapshot to prevent concurrent operations
if parentVol != nil {
if acquired := cs.VolumeLocks.TryAcquire(parentVol.VolID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, parentVol.VolID)

return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, parentVol.VolID)
}
defer cs.VolumeLocks.Release(parentVol.VolID)
}
if rbdSnap != nil {
if acquired := cs.SnapshotLocks.TryAcquire(rbdSnap.VolID); !acquired {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateVolumeFromSnapshot will fail with operation already exists since we already have locks there?

func (cs *ControllerServer) createVolumeFromSnapshot(
ctx context.Context,
cr *util.Credentials,
secrets map[string]string,
rbdVol *rbdVolume,
snapshotID string,
) error {
if acquired := cs.SnapshotLocks.TryAcquire(snapshotID); !acquired {
log.ErrorLog(ctx, util.SnapshotOperationAlreadyExistsFmt, snapshotID)
return status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, snapshotID)
}
defer cs.SnapshotLocks.Release(snapshotID)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I would expect the same indeed too. Will check it in more detail and correct it later.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the locking all together, easier to follow the working that way.

log.ErrorLog(ctx, util.SnapshotOperationAlreadyExistsFmt, rbdSnap.VolID)

return nil, status.Errorf(codes.Aborted, util.SnapshotOperationAlreadyExistsFmt, rbdSnap.VolID)
}
defer cs.SnapshotLocks.Release(rbdSnap.VolID)
}

err = updateTopologyConstraints(rbdVol, rbdSnap)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
Expand Down Expand Up @@ -683,13 +703,6 @@ func (cs *ControllerServer) createVolumeFromSnapshot(
rbdVol *rbdVolume,
snapshotID string,
) error {
if acquired := cs.SnapshotLocks.TryAcquire(snapshotID); !acquired {
log.ErrorLog(ctx, util.SnapshotOperationAlreadyExistsFmt, snapshotID)

return status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, snapshotID)
}
defer cs.SnapshotLocks.Release(snapshotID)

rbdSnap, err := genSnapFromSnapID(ctx, snapshotID, cr, secrets)
if err != nil {
if errors.Is(err, util.ErrPoolNotFound) {
Expand Down