Skip to content

Commit fc70003

Browse files
authored
Merge pull request #147 from Azure/dev
Release 0.8.0
2 parents 33c102d + 3e58b0e commit fc70003

File tree

10 files changed

+184
-67
lines changed

10 files changed

+184
-67
lines changed

ChangeLog.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
> See [BreakingChanges](BreakingChanges.md) for a detailed list of API breaks.
44
5+
## Version 0.8.0:
6+
- Fixed error handling in high-level function DoBatchTransfer, and made it public for easy customization
7+
58
## Version 0.7.0:
69
- Added the ability to obtain User Delegation Keys (UDK)
710
- Added the ability to create User Delegation SAS tokens from UDKs

azblob/highlevel.go

Lines changed: 43 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func UploadBufferToBlockBlob(ctx context.Context, b []byte,
6666
if o.BlockSize == 0 {
6767
// If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error
6868
if bufferSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks {
69-
return nil, errors.New("Buffer is too large to upload to a block blob")
69+
return nil, errors.New("buffer is too large to upload to a block blob")
7070
}
7171
// If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request
7272
if bufferSize <= BlockBlobMaxUploadBlobBytes {
@@ -76,7 +76,7 @@ func UploadBufferToBlockBlob(ctx context.Context, b []byte,
7676
if o.BlockSize < BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB
7777
o.BlockSize = BlobDefaultDownloadBlockSize
7878
}
79-
// StageBlock will be called with blockSize blocks and a parallelism of (BufferSize / BlockSize).
79+
// StageBlock will be called with blockSize blocks and a Parallelism of (BufferSize / BlockSize).
8080
}
8181
}
8282

@@ -95,12 +95,12 @@ func UploadBufferToBlockBlob(ctx context.Context, b []byte,
9595
progress := int64(0)
9696
progressLock := &sync.Mutex{}
9797

98-
err := doBatchTransfer(ctx, batchTransferOptions{
99-
operationName: "UploadBufferToBlockBlob",
100-
transferSize: bufferSize,
101-
chunkSize: o.BlockSize,
102-
parallelism: o.Parallelism,
103-
operation: func(offset int64, count int64) error {
98+
err := DoBatchTransfer(ctx, BatchTransferOptions{
99+
OperationName: "UploadBufferToBlockBlob",
100+
TransferSize: bufferSize,
101+
ChunkSize: o.BlockSize,
102+
Parallelism: o.Parallelism,
103+
Operation: func(offset int64, count int64, ctx context.Context) error {
104104
// This function is called once per block.
105105
// It is passed this block's offset within the buffer and its count of bytes
106106
// Prepare to read the proper block/section of the buffer
@@ -198,12 +198,12 @@ func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, co
198198
progress := int64(0)
199199
progressLock := &sync.Mutex{}
200200

201-
err := doBatchTransfer(ctx, batchTransferOptions{
202-
operationName: "downloadBlobToBuffer",
203-
transferSize: count,
204-
chunkSize: o.BlockSize,
205-
parallelism: o.Parallelism,
206-
operation: func(chunkStart int64, count int64) error {
201+
err := DoBatchTransfer(ctx, BatchTransferOptions{
202+
OperationName: "downloadBlobToBuffer",
203+
TransferSize: count,
204+
ChunkSize: o.BlockSize,
205+
Parallelism: o.Parallelism,
206+
Operation: func(chunkStart int64, count int64, ctx context.Context) error {
207207
dr, err := blobURL.Download(ctx, chunkStart+offset, count, o.AccessConditions, false)
208208
if err != nil {
209209
return err
@@ -285,64 +285,69 @@ func DownloadBlobToFile(ctx context.Context, blobURL BlobURL, offset int64, coun
285285

286286
///////////////////////////////////////////////////////////////////////////////
287287

288-
// BatchTransferOptions identifies options used by doBatchTransfer.
289-
type batchTransferOptions struct {
290-
transferSize int64
291-
chunkSize int64
292-
parallelism uint16
293-
operation func(offset int64, chunkSize int64) error
294-
operationName string
288+
// BatchTransferOptions identifies options used by DoBatchTransfer.
289+
type BatchTransferOptions struct {
290+
TransferSize int64
291+
ChunkSize int64
292+
Parallelism uint16
293+
Operation func(offset int64, chunkSize int64, ctx context.Context) error
294+
OperationName string
295295
}
296296

297-
// doBatchTransfer helps to execute operations in a batch manner.
298-
func doBatchTransfer(ctx context.Context, o batchTransferOptions) error {
297+
// DoBatchTransfer helps to execute operations in a batch manner.
298+
// Can be used by users to customize batch works (for other scenarios that the SDK does not provide)
299+
func DoBatchTransfer(ctx context.Context, o BatchTransferOptions) error {
300+
if o.ChunkSize == 0 {
301+
return errors.New("ChunkSize cannot be 0")
302+
}
303+
299304
// Prepare and do parallel operations.
300-
numChunks := uint16(((o.transferSize - 1) / o.chunkSize) + 1)
301-
operationChannel := make(chan func() error, o.parallelism) // Create the channel that release 'parallelism' goroutines concurrently
305+
numChunks := uint16(((o.TransferSize - 1) / o.ChunkSize) + 1)
306+
operationChannel := make(chan func() error, o.Parallelism) // Create the channel that release 'Parallelism' goroutines concurrently
302307
operationResponseChannel := make(chan error, numChunks) // Holds each response
303308
ctx, cancel := context.WithCancel(ctx)
304309
defer cancel()
305310

306311
// Create the goroutines that process each operation (in parallel).
307-
if o.parallelism == 0 {
308-
o.parallelism = 5 // default parallelism
312+
if o.Parallelism == 0 {
313+
o.Parallelism = 5 // default Parallelism
309314
}
310-
for g := uint16(0); g < o.parallelism; g++ {
315+
for g := uint16(0); g < o.Parallelism; g++ {
311316
//grIndex := g
312317
go func() {
313318
for f := range operationChannel {
314-
//fmt.Printf("[%s] gr-%d start action\n", o.operationName, grIndex)
315319
err := f()
316320
operationResponseChannel <- err
317-
//fmt.Printf("[%s] gr-%d end action\n", o.operationName, grIndex)
318321
}
319322
}()
320323
}
321324

322325
// Add each chunk's operation to the channel.
323326
for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ {
324-
curChunkSize := o.chunkSize
327+
curChunkSize := o.ChunkSize
325328

326329
if chunkNum == numChunks-1 { // Last chunk
327-
curChunkSize = o.transferSize - (int64(chunkNum) * o.chunkSize) // Remove size of all transferred chunks from total
330+
curChunkSize = o.TransferSize - (int64(chunkNum) * o.ChunkSize) // Remove size of all transferred chunks from total
328331
}
329-
offset := int64(chunkNum) * o.chunkSize
332+
offset := int64(chunkNum) * o.ChunkSize
330333

331334
operationChannel <- func() error {
332-
return o.operation(offset, curChunkSize)
335+
return o.Operation(offset, curChunkSize, ctx)
333336
}
334337
}
335338
close(operationChannel)
336339

337340
// Wait for the operations to complete.
341+
var firstErr error = nil
338342
for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ {
339343
responseError := <-operationResponseChannel
340-
if responseError != nil {
341-
cancel() // As soon as any operation fails, cancel all remaining operation calls
342-
return responseError // No need to process anymore responses
344+
// record the first error (the original error which should cause the other chunks to fail with canceled context)
345+
if responseError != nil && firstErr == nil {
346+
cancel() // As soon as any operation fails, cancel all remaining operation calls
347+
firstErr = responseError
343348
}
344349
}
345-
return nil
350+
return firstErr
346351
}
347352

348353
////////////////////////////////////////////////////////////////////////////////////////////////

azblob/zt_examples_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ func ExampleNewPipeline() {
179179
// Send the request over the network
180180
resp, err := client.Do(request.WithContext(ctx))
181181

182-
return &httpResponse{response: resp}, err
182+
return pipeline.NewHTTPResponse(resp), err
183183
}
184184
}),
185185
}

azblob/zt_highlevel_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ package azblob_test
22

33
import (
44
"context"
5+
"errors"
56
"io/ioutil"
67
"os"
8+
"sync/atomic"
9+
"time"
710

811
"github.com/Azure/azure-storage-blob-go/azblob"
912
chk "gopkg.in/check.v1"
@@ -329,3 +332,104 @@ func (s *aztestsSuite) TestDownloadBufferWithNonZeroOffsetAndCount(c *chk.C) {
329332
downloadCount := 6 * 1024
330333
performUploadAndDownloadBufferTest(c, blobSize, blockSize, parallelism, downloadOffset, downloadCount)
331334
}
335+
336+
func (s *aztestsSuite) TestBasicDoBatchTransfer(c *chk.C) {
337+
// test the basic multi-routine processing
338+
type testInstance struct {
339+
transferSize int64
340+
chunkSize int64
341+
parallelism uint16
342+
expectError bool
343+
}
344+
345+
testMatrix := []testInstance{
346+
{transferSize: 100, chunkSize: 10, parallelism: 5, expectError: false},
347+
{transferSize: 100, chunkSize: 9, parallelism: 4, expectError: false},
348+
{transferSize: 100, chunkSize: 8, parallelism: 15, expectError: false},
349+
{transferSize: 100, chunkSize: 1, parallelism: 3, expectError: false},
350+
{transferSize: 0, chunkSize: 100, parallelism: 5, expectError: false}, // empty file works
351+
{transferSize: 100, chunkSize: 0, parallelism: 5, expectError: true}, // 0 chunk size on the other hand must fail
352+
{transferSize: 0, chunkSize: 0, parallelism: 5, expectError: true},
353+
}
354+
355+
for _, test := range testMatrix {
356+
ctx := context.Background()
357+
// maintain some counts to make sure the right number of chunks were queued, and the total size is correct
358+
totalSizeCount := int64(0)
359+
runCount := int64(0)
360+
361+
err := azblob.DoBatchTransfer(ctx, azblob.BatchTransferOptions{
362+
TransferSize: test.transferSize,
363+
ChunkSize: test.chunkSize,
364+
Parallelism: test.parallelism,
365+
Operation: func(offset int64, chunkSize int64, ctx context.Context) error {
366+
atomic.AddInt64(&totalSizeCount, chunkSize)
367+
atomic.AddInt64(&runCount, 1)
368+
return nil
369+
},
370+
OperationName: "TestHappyPath",
371+
})
372+
373+
if test.expectError {
374+
c.Assert(err, chk.NotNil)
375+
} else {
376+
c.Assert(err, chk.IsNil)
377+
c.Assert(totalSizeCount, chk.Equals, test.transferSize)
378+
c.Assert(runCount, chk.Equals, ((test.transferSize-1)/test.chunkSize)+1)
379+
}
380+
}
381+
}
382+
383+
// mock a memory mapped file (low-quality mock, meant to simulate the scenario only)
384+
type mockMMF struct {
385+
isClosed bool
386+
failHandle *chk.C
387+
}
388+
389+
// accept input
390+
func (m *mockMMF) write(input string) {
391+
if m.isClosed {
392+
// simulate panic
393+
m.failHandle.Fail()
394+
}
395+
}
396+
397+
func (s *aztestsSuite) TestDoBatchTransferWithError(c *chk.C) {
398+
ctx := context.Background()
399+
mmf := mockMMF{failHandle: c}
400+
expectedFirstError := errors.New("#3 means trouble")
401+
402+
err := azblob.DoBatchTransfer(ctx, azblob.BatchTransferOptions{
403+
TransferSize: 5,
404+
ChunkSize: 1,
405+
Parallelism: 5,
406+
Operation: func(offset int64, chunkSize int64, ctx context.Context) error {
407+
// simulate doing some work (HTTP call in real scenarios)
408+
// later chunks later longer to finish
409+
time.Sleep(time.Second * time.Duration(offset))
410+
// simulate having gotten data and write it to the memory mapped file
411+
mmf.write("input")
412+
413+
// with one of the chunks, pretend like an error occurred (like the network connection breaks)
414+
if offset == 3 {
415+
return expectedFirstError
416+
} else if offset > 3 {
417+
// anything after offset=3 are canceled
418+
// so verify that the context indeed got canceled
419+
ctxErr := ctx.Err()
420+
c.Assert(ctxErr, chk.Equals, context.Canceled)
421+
return ctxErr
422+
}
423+
424+
// anything before offset=3 should be done without problem
425+
return nil
426+
},
427+
OperationName: "TestErrorPath",
428+
})
429+
430+
c.Assert(err, chk.Equals, expectedFirstError)
431+
432+
// simulate closing the mmf and make sure no panic occurs (as reported in #139)
433+
mmf.isClosed = true
434+
time.Sleep(time.Second * 5)
435+
}

azblob/zt_retry_reader_test.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,25 +27,25 @@ type perByteReader struct {
2727
injectedError error
2828

2929
// sleepDuraion and closeChannel are only use in "forced cancellation" tests
30-
sleepDuration time.Duration
31-
closeChannel chan struct{}
30+
sleepDuration time.Duration
31+
closeChannel chan struct{}
3232
}
3333

3434
func newPerByteReader(byteCount int) *perByteReader {
3535
perByteReader := perByteReader{
36-
byteCount: byteCount,
36+
byteCount: byteCount,
3737
closeChannel: nil,
3838
}
3939

4040
perByteReader.RandomBytes = make([]byte, byteCount)
41-
_,_ = rand.Read(perByteReader.RandomBytes)
41+
_, _ = rand.Read(perByteReader.RandomBytes)
4242

4343
return &perByteReader
4444
}
4545

4646
func newSingleUsePerByteReader(contents []byte) *perByteReader {
4747
perByteReader := perByteReader{
48-
byteCount: len(contents),
48+
byteCount: len(contents),
4949
closeChannel: make(chan struct{}, 10),
5050
}
5151

@@ -86,7 +86,7 @@ func (r *perByteReader) Close() error {
8686

8787
// Test normal retry succeed, note initial response not provided.
8888
// Tests both with and without notification of failures
89-
func (r *aztestsSuite) TestRetryReaderReadWithRetry(c *chk.C) {
89+
func (s *aztestsSuite) TestRetryReaderReadWithRetry(c *chk.C) {
9090
// Test twice, the second time using the optional "logging"/notification callback for failed tries
9191
// We must test both with and without the callback, since be testing without
9292
// we are testing that it is, indeed, optional to provide the callback
@@ -155,7 +155,7 @@ func (r *aztestsSuite) TestRetryReaderReadWithRetry(c *chk.C) {
155155
}
156156

157157
// Test normal retry fail as retry Count not enough.
158-
func (r *aztestsSuite) TestRetryReaderReadNegativeNormalFail(c *chk.C) {
158+
func (s *aztestsSuite) TestRetryReaderReadNegativeNormalFail(c *chk.C) {
159159
// Extra setup for testing notification of failures (i.e. of unsuccessful tries)
160160
failureMethodNumCalls := 0
161161
failureWillRetryCount := 0
@@ -210,7 +210,7 @@ func (r *aztestsSuite) TestRetryReaderReadNegativeNormalFail(c *chk.C) {
210210
}
211211

212212
// Test boundary case when Count equals to 0 and fail.
213-
func (r *aztestsSuite) TestRetryReaderReadCount0(c *chk.C) {
213+
func (s *aztestsSuite) TestRetryReaderReadCount0(c *chk.C) {
214214
byteCount := 1
215215
body := newPerByteReader(byteCount)
216216
body.doInjectError = true
@@ -243,7 +243,7 @@ func (r *aztestsSuite) TestRetryReaderReadCount0(c *chk.C) {
243243
c.Assert(err, chk.Equals, io.EOF)
244244
}
245245

246-
func (r *aztestsSuite) TestRetryReaderReadNegativeNonRetriableError(c *chk.C) {
246+
func (s *aztestsSuite) TestRetryReaderReadNegativeNonRetriableError(c *chk.C) {
247247
byteCount := 1
248248
body := newPerByteReader(byteCount)
249249
body.doInjectError = true
@@ -274,7 +274,7 @@ func (r *aztestsSuite) TestRetryReaderReadNegativeNonRetriableError(c *chk.C) {
274274
// purposes of unit testing, here we are testing the cancellation mechanism that is exposed to
275275
// consumers of the API, to allow programmatic forcing of retries (e.g. if the consumer deems
276276
// the read to be taking too long, they may force a retry in the hope of better performance next time).
277-
func (r *aztestsSuite) TestRetryReaderReadWithForcedRetry(c *chk.C) {
277+
func (s *aztestsSuite) TestRetryReaderReadWithForcedRetry(c *chk.C) {
278278

279279
for _, enableRetryOnEarlyClose := range []bool{false, true} {
280280

@@ -327,5 +327,4 @@ func (r *aztestsSuite) TestRetryReaderReadWithForcedRetry(c *chk.C) {
327327
}
328328
}
329329

330-
331330
// End testings for RetryReader

0 commit comments

Comments
 (0)