From 6bdd22ae36766edd819fa708f4888005fea0801e Mon Sep 17 00:00:00 2001 From: "brecht.vermeersch" Date: Tue, 25 Feb 2025 21:38:44 +0100 Subject: [PATCH 1/5] add async api --- src/Blob/BlobClient.php | 247 +++++++++++------- .../Models/BlobDownloadStreamingResult.php | 9 + 2 files changed, 168 insertions(+), 88 deletions(-) diff --git a/src/Blob/BlobClient.php b/src/Blob/BlobClient.php index 73ebf59..0711f76 100644 --- a/src/Blob/BlobClient.php +++ b/src/Blob/BlobClient.php @@ -24,9 +24,11 @@ use GuzzleHttp\Client; use GuzzleHttp\Exception\RequestException; use GuzzleHttp\Pool; +use GuzzleHttp\Promise\Promise; use GuzzleHttp\Promise\PromiseInterface; use GuzzleHttp\Psr7\Uri; use GuzzleHttp\Psr7\Utils as StreamUtils; +use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\StreamInterface; use Psr\Http\Message\UriInterface; @@ -55,82 +57,131 @@ public function __construct( public function downloadStreaming(): BlobDownloadStreamingResult { - try { - $response = $this->client->get($this->uri, [ - 'stream' => true, - ]); + /** @phpstan-ignore-next-line */ + return $this->downloadStreamingAsync()->wait(); + } - return new BlobDownloadStreamingResult( - $response->getBody(), - BlobProperties::fromResponseHeaders($response), + public function downloadStreamingAsync(): PromiseInterface + { + return $this->client + ->getAsync($this->uri, [ + 'stream' => true, + ]) + ->then( + BlobDownloadStreamingResult::fromResponse(...), + fn(\Throwable $e) => throw $this->exceptionFactory->create($e), ); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } } public function getProperties(): BlobProperties { - try { - $response = $this->client->head($this->uri); + /** @phpstan-ignore-next-line */ + return $this->getPropertiesAsync()->wait(); + } - return BlobProperties::fromResponseHeaders($response); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + public function getPropertiesAsync(): PromiseInterface + { + return $this->client + ->headAsync($this->uri) + ->then( + BlobProperties::fromResponseHeaders(...), + fn(\Throwable $e) => throw $this->exceptionFactory->create($e), + ); } /** * @param array $metadata - * @return void */ public function setMetadata(array $metadata): void { - try { - $this->client->put($this->uri, [ + $this->setMetadataAsync($metadata)->wait(); + } + + /** + * @param array $metadata + */ + public function setMetadataAsync(array $metadata): PromiseInterface + { + return $this->client + ->putAsync($this->uri, [ 'query' => [ 'comp' => 'metadata', ], 'headers' => MetadataHelper::metadataToHeaders($metadata), - ]); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + ]) + ->then( + null, + fn(\Throwable $e) => throw $this->exceptionFactory->create($e), + ); } public function delete(): void { - try { - $this->client->delete($this->uri); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + $this->deleteAsync()->wait(); + } + + public function deleteAsync(): PromiseInterface + { + return $this->client + ->deleteAsync($this->uri) + ->then( + null, + fn(\Throwable $e) => throw $this->exceptionFactory->create($e), + ); } public function deleteIfExists(): void { - try { - $this->delete(); - } catch (BlobNotFoundException) { - // do nothing - } + $this->deleteIfExistsAsync()->wait(); + } + + public function deleteIfExistsAsync(): PromiseInterface + { + return $this->deleteAsync()->then( + null, + function (\Throwable $e) { + if ($e instanceof BlobNotFoundException) { + return null; + } + + throw $e; + }, + ); } public function exists(): bool { - try { - $this->getProperties(); + /** @phpstan-ignore-next-line */ + return $this->existsAsync()->wait(); + } - return true; - } catch (BlobNotFoundException) { - return false; - } + public function existsAsync(): PromiseInterface + { + return $this->getPropertiesAsync() + ->then( + fn() => true, + function (\Throwable $e) { + if ($e instanceof BlobNotFoundException) { + return false; + } + + throw $e; + }, + ); } /** * @param string|resource|StreamInterface $content */ public function upload($content, ?UploadBlobOptions $options = null): void + { + $this->uploadAsync($content, $options)->wait(); + } + + /** + * @param string|resource|StreamInterface $content + */ + public function uploadAsync($content, ?UploadBlobOptions $options = null): PromiseInterface { if ($options === null) { $options = new UploadBlobOptions(); @@ -139,14 +190,15 @@ public function upload($content, ?UploadBlobOptions $options = null): void $content = $this->createUploadStream($content, $options); if ($content->getSize() === null || ! $content->isSeekable()) { - $this->uploadInSequentialBlocks($content, $options); + return $this->uploadInSequentialBlocksAsync($content, $options); } elseif ($content->getSize() > $options->initialTransferSize) { - $this->uploadInParallelBlocks($content, $options); + return $this->uploadInParallelBlocksAsync($content, $options); } else { - $this->uploadSingle($content, $options); + return $this->uploadSingleAsync($content, $options); } } + /** * @param string|resource|StreamInterface $content */ @@ -164,23 +216,24 @@ private function createUploadStream($content, UploadBlobOptions $options): Strea return StreamUtils::streamFor($content); } - private function uploadSingle(StreamInterface $content, UploadBlobOptions $options): void + private function uploadSingleAsync(StreamInterface $content, UploadBlobOptions $options): PromiseInterface { - try { - $this->client->put($this->uri, [ + return $this->client + ->putAsync($this->uri, [ 'headers' => [ 'x-ms-blob-type' => 'BlockBlob', 'Content-Type' => $options->contentType, 'Content-Length' => $content->getSize(), ], 'body' => $content, - ]); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + ]) + ->then( + null, + fn(\Throwable $e) => throw $this->exceptionFactory->create($e), + ); } - private function uploadInSequentialBlocks(StreamInterface $content, UploadBlobOptions $options): void + private function uploadInSequentialBlocksAsync(StreamInterface $content, UploadBlobOptions $options): PromiseInterface { $blocks = []; @@ -203,14 +256,14 @@ private function uploadInSequentialBlocks(StreamInterface $content, UploadBlobOp $contentMD5 = hash_final($contextMD5, true); - $this->putBlockList( + return $this->putBlockListAsync( $blocks, $options->contentType, $contentMD5, ); } - private function uploadInParallelBlocks(StreamInterface $content, UploadBlobOptions $options): void + private function uploadInParallelBlocksAsync(StreamInterface $content, UploadBlobOptions $options): PromiseInterface { $blocks = []; @@ -237,13 +290,16 @@ private function uploadInParallelBlocks(StreamInterface $content, UploadBlobOpti }, ]); - $pool->promise()->wait(); - - $this->putBlockList( - $blocks, - $options->contentType, - StreamUtils::hash($content, 'md5', true), - ); + return $pool + ->promise() + ->then( + fn() => $this->putBlockListAsync( + $blocks, + $options->contentType, + StreamUtils::hash($content, 'md5', true), + ), + fn(\Throwable $e) => throw $this->exceptionFactory->create($e), + ); } private function putBlockAsync(Block $block, StreamInterface|string $content): PromiseInterface @@ -264,10 +320,10 @@ private function putBlockAsync(Block $block, StreamInterface|string $content): P /** * @param Block[] $blocks */ - private function putBlockList(array $blocks, ?string $contentType, string $contentMD5): void + private function putBlockListAsync(array $blocks, ?string $contentType, string $contentMD5): PromiseInterface { - try { - $this->client->put($this->uri, [ + return $this->client + ->putAsync($this->uri, [ 'query' => [ 'comp' => 'blocklist', ], @@ -276,23 +332,26 @@ private function putBlockList(array $blocks, ?string $contentType, string $conte 'x-ms-blob-content-md5' => base64_encode($contentMD5), ], 'body' => (new PutBlockRequestBody($blocks))->toXml()->asXML(), - ]); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + ]); } public function copyFromUri(UriInterface $source): void { - try { - $this->client->put($this->uri, [ + $this->copyFromUriAsync($source)->wait(); + } + + public function copyFromUriAsync(UriInterface $source): PromiseInterface + { + return $this->client + ->putAsync($this->uri, [ 'headers' => [ 'x-ms-copy-source' => (string) $source, ], - ]); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + ]) + ->then( + null, + fn(\Throwable $e) => throw $this->exceptionFactory->create($e), + ); } public function generateSasUri(BlobSasBuilder $blobSasBuilder): UriInterface @@ -315,20 +374,28 @@ public function generateSasUri(BlobSasBuilder $blobSasBuilder): UriInterface /** * @param array $tags - * @return void */ public function setTags(array $tags): void { - try { - $this->client->put($this->uri, [ + $this->setTagsAsync($tags)->wait(); + } + + /** + * @param array $tags + */ + public function setTagsAsync(array $tags): PromiseInterface + { + return $this->client + ->putAsync($this->uri, [ 'query' => [ 'comp' => 'tags', ], 'body' => (new BlobTagsBody($tags))->toXml()->asXML(), - ]); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + ]) + ->then( + null, + fn(\Throwable $e) => throw $this->exceptionFactory->create($e), + ); } /** @@ -336,17 +403,21 @@ public function setTags(array $tags): void */ public function getTags(): array { - try { - $response = $this->client->get($this->uri, [ + /** @phpstan-ignore-next-line */ + return $this->getTagsAsync()->wait(); + } + + public function getTagsAsync(): PromiseInterface + { + return $this->client + ->getAsync($this->uri, [ 'query' => [ 'comp' => 'tags', ], - ]); - - $body = BlobTagsBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents())); - return $body->tags; - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + ]) + ->then( + fn(ResponseInterface $response) => BlobTagsBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents()))->tags, + fn(\Throwable $e) => throw $this->exceptionFactory->create($e), + ); } } diff --git a/src/Blob/Models/BlobDownloadStreamingResult.php b/src/Blob/Models/BlobDownloadStreamingResult.php index bf8d990..c9dc664 100644 --- a/src/Blob/Models/BlobDownloadStreamingResult.php +++ b/src/Blob/Models/BlobDownloadStreamingResult.php @@ -4,6 +4,7 @@ namespace AzureOss\Storage\Blob\Models; +use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\StreamInterface; final class BlobDownloadStreamingResult @@ -12,4 +13,12 @@ public function __construct( public readonly StreamInterface $content, public readonly BlobProperties $properties, ) {} + + public static function fromResponse(ResponseInterface $response): self + { + return new self( + $response->getBody(), + BlobProperties::fromResponseHeaders($response), + ); + } } From 777d732f02435cef9f99b6718b8d881d25417cf3 Mon Sep 17 00:00:00 2001 From: "brecht.vermeersch" Date: Tue, 25 Feb 2025 21:39:03 +0100 Subject: [PATCH 2/5] pint --- src/Blob/BlobClient.php | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/Blob/BlobClient.php b/src/Blob/BlobClient.php index 0711f76..9978919 100644 --- a/src/Blob/BlobClient.php +++ b/src/Blob/BlobClient.php @@ -22,9 +22,7 @@ use AzureOss\Storage\Common\Middleware\ClientFactory; use AzureOss\Storage\Common\Sas\SasProtocol; use GuzzleHttp\Client; -use GuzzleHttp\Exception\RequestException; use GuzzleHttp\Pool; -use GuzzleHttp\Promise\Promise; use GuzzleHttp\Promise\PromiseInterface; use GuzzleHttp\Psr7\Uri; use GuzzleHttp\Psr7\Utils as StreamUtils; @@ -332,7 +330,7 @@ private function putBlockListAsync(array $blocks, ?string $contentType, string $ 'x-ms-blob-content-md5' => base64_encode($contentMD5), ], 'body' => (new PutBlockRequestBody($blocks))->toXml()->asXML(), - ]); + ]); } public function copyFromUri(UriInterface $source): void From e3ca80cd3821065047945862a999f6060da87017 Mon Sep 17 00:00:00 2001 From: "brecht.vermeersch" Date: Wed, 26 Feb 2025 17:34:02 +0100 Subject: [PATCH 3/5] add async to other clients --- src/Blob/BlobClient.php | 160 +++++------- src/Blob/BlobContainerClient.php | 228 ++++++++++-------- src/Blob/BlobServiceClient.php | 94 ++++---- ...p => BlobStorageExceptionDeserializer.php} | 10 +- src/Blob/Helpers/StreamHelper.php | 55 +++++ .../RequestExceptionDeserializer.php | 15 ++ src/Common/Middleware/ClientFactory.php | 4 +- .../DeserializeExceptionMiddleware.php | 32 +++ tests/Blob/Benchmark/BlobClientBench.php | 6 +- tests/Blob/Feature/BlobClientTest.php | 14 +- 10 files changed, 351 insertions(+), 267 deletions(-) rename src/Blob/Exceptions/{BlobStorageExceptionFactory.php => BlobStorageExceptionDeserializer.php} (87%) create mode 100644 src/Blob/Helpers/StreamHelper.php create mode 100644 src/Common/Exceptions/RequestExceptionDeserializer.php create mode 100644 src/Common/Middleware/DeserializeExceptionMiddleware.php diff --git a/src/Blob/BlobClient.php b/src/Blob/BlobClient.php index 9978919..ec4cbf0 100644 --- a/src/Blob/BlobClient.php +++ b/src/Blob/BlobClient.php @@ -5,11 +5,12 @@ namespace AzureOss\Storage\Blob; use AzureOss\Storage\Blob\Exceptions\BlobNotFoundException; -use AzureOss\Storage\Blob\Exceptions\BlobStorageExceptionFactory; +use AzureOss\Storage\Blob\Exceptions\BlobStorageExceptionDeserializer; use AzureOss\Storage\Blob\Exceptions\InvalidBlobUriException; use AzureOss\Storage\Blob\Exceptions\UnableToGenerateSasException; use AzureOss\Storage\Blob\Helpers\BlobUriParserHelper; use AzureOss\Storage\Blob\Helpers\MetadataHelper; +use AzureOss\Storage\Blob\Helpers\StreamHelper; use AzureOss\Storage\Blob\Models\BlobDownloadStreamingResult; use AzureOss\Storage\Blob\Models\BlobProperties; use AzureOss\Storage\Blob\Models\UploadBlobOptions; @@ -34,8 +35,6 @@ final class BlobClient { private readonly Client $client; - private readonly BlobStorageExceptionFactory $exceptionFactory; - public readonly string $containerName; public readonly string $blobName; @@ -49,8 +48,7 @@ public function __construct( ) { $this->containerName = BlobUriParserHelper::getContainerName($uri); $this->blobName = BlobUriParserHelper::getBlobName($uri); - $this->client = (new ClientFactory())->create($uri, $sharedKeyCredentials); - $this->exceptionFactory = new BlobStorageExceptionFactory(); + $this->client = (new ClientFactory())->create($uri, $sharedKeyCredentials, new BlobStorageExceptionDeserializer()); } public function downloadStreaming(): BlobDownloadStreamingResult @@ -65,10 +63,8 @@ public function downloadStreamingAsync(): PromiseInterface ->getAsync($this->uri, [ 'stream' => true, ]) - ->then( - BlobDownloadStreamingResult::fromResponse(...), - fn(\Throwable $e) => throw $this->exceptionFactory->create($e), - ); + ->then(BlobDownloadStreamingResult::fromResponse(...)); + // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } public function getProperties(): BlobProperties @@ -81,10 +77,8 @@ public function getPropertiesAsync(): PromiseInterface { return $this->client ->headAsync($this->uri) - ->then( - BlobProperties::fromResponseHeaders(...), - fn(\Throwable $e) => throw $this->exceptionFactory->create($e), - ); + ->then(BlobProperties::fromResponseHeaders(...)); + // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } /** @@ -106,11 +100,8 @@ public function setMetadataAsync(array $metadata): PromiseInterface 'comp' => 'metadata', ], 'headers' => MetadataHelper::metadataToHeaders($metadata), - ]) - ->then( - null, - fn(\Throwable $e) => throw $this->exceptionFactory->create($e), - ); + ]); + // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } public function delete(): void @@ -120,12 +111,8 @@ public function delete(): void public function deleteAsync(): PromiseInterface { - return $this->client - ->deleteAsync($this->uri) - ->then( - null, - fn(\Throwable $e) => throw $this->exceptionFactory->create($e), - ); + return $this->client->deleteAsync($this->uri); + // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } public function deleteIfExists(): void @@ -135,8 +122,7 @@ public function deleteIfExists(): void public function deleteIfExistsAsync(): PromiseInterface { - return $this->deleteAsync()->then( - null, + return $this->deleteAsync()->otherwise( function (\Throwable $e) { if ($e instanceof BlobNotFoundException) { return null; @@ -156,8 +142,8 @@ public function exists(): bool public function existsAsync(): PromiseInterface { return $this->getPropertiesAsync() - ->then( - fn() => true, + ->then(fn() => true) + ->otherwise( function (\Throwable $e) { if ($e instanceof BlobNotFoundException) { return false; @@ -185,7 +171,7 @@ public function uploadAsync($content, ?UploadBlobOptions $options = null): Promi $options = new UploadBlobOptions(); } - $content = $this->createUploadStream($content, $options); + $content = StreamHelper::createUploadStream($content, $options->maximumTransferSize); if ($content->getSize() === null || ! $content->isSeekable()) { return $this->uploadInSequentialBlocksAsync($content, $options); @@ -196,24 +182,6 @@ public function uploadAsync($content, ?UploadBlobOptions $options = null): Promi } } - - /** - * @param string|resource|StreamInterface $content - */ - private function createUploadStream($content, UploadBlobOptions $options): StreamInterface - { - if ($content instanceof StreamInterface) { - $content = $content->detach(); - } - - // fix network streams only reading 8KB chunks - if (is_resource($content)) { - stream_set_chunk_size($content, $options->maximumTransferSize); - } - - return StreamUtils::streamFor($content); - } - private function uploadSingleAsync(StreamInterface $content, UploadBlobOptions $options): PromiseInterface { return $this->client @@ -224,52 +192,57 @@ private function uploadSingleAsync(StreamInterface $content, UploadBlobOptions $ 'Content-Length' => $content->getSize(), ], 'body' => $content, - ]) - ->then( - null, - fn(\Throwable $e) => throw $this->exceptionFactory->create($e), - ); + ]); + // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } private function uploadInSequentialBlocksAsync(StreamInterface $content, UploadBlobOptions $options): PromiseInterface { $blocks = []; - $contextMD5 = hash_init('md5'); - while (true) { - $blockContent = $content->read($options->maximumTransferSize); - - if ($blockContent === "") { - break; - } + $putBlockRequestGenerator = function () use (&$content, &$options, &$blocks, &$contextMD5): \Generator { + while (true) { + $blockContent = $content->read($options->maximumTransferSize); + if ($blockContent === "") { + break; + } - $block = new Block(count($blocks), BlockType::UNCOMMITTED); - $blocks[] = $block; + $block = new Block(count($blocks), BlockType::UNCOMMITTED); + $blocks[] = $block; - hash_update($contextMD5, $blockContent); + hash_update($contextMD5, $blockContent); - $this->putBlockAsync($block, $blockContent)->wait(); - } - - $contentMD5 = hash_final($contextMD5, true); + yield fn() => $this->putBlockAsync($block, $blockContent); + } + }; - return $this->putBlockListAsync( - $blocks, - $options->contentType, - $contentMD5, - ); + return (new Pool( + $this->client, + $putBlockRequestGenerator(), + ['concurrency' => 1], + )) + ->promise() + ->then( + function () use (&$blocks, &$options, &$contextMD5) { + return $this->putBlockListAsync( + $blocks, + $options->contentType, + hash_final($contextMD5, true), + ); + }, + ); + // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } private function uploadInParallelBlocksAsync(StreamInterface $content, UploadBlobOptions $options): PromiseInterface { $blocks = []; - $putBlockRequestGenerator = function () use ($content, $options, &$blocks): \Generator { + $putBlockRequestGenerator = function () use (&$content, &$options, &$blocks): \Generator { while (true) { $blockContent = StreamUtils::streamFor(); StreamUtils::copyToStream($content, $blockContent, $options->maximumTransferSize); - if ($blockContent->getSize() === 0) { break; } @@ -281,23 +254,24 @@ private function uploadInParallelBlocksAsync(StreamInterface $content, UploadBlo } }; - $pool = new Pool($this->client, $putBlockRequestGenerator(), [ - 'concurrency' => $options->maximumConcurrency, - 'rejected' => function (\Exception $e) { - throw $this->exceptionFactory->create($e); - }, - ]); + $pool = new Pool( + $this->client, + $putBlockRequestGenerator(), + ['concurrency' => $options->maximumConcurrency], + ); return $pool ->promise() ->then( - fn() => $this->putBlockListAsync( - $blocks, - $options->contentType, - StreamUtils::hash($content, 'md5', true), - ), - fn(\Throwable $e) => throw $this->exceptionFactory->create($e), + function () use (&$content, &$blocks, &$options) { + return $this->putBlockListAsync( + $blocks, + $options->contentType, + StreamUtils::hash($content, 'md5', true), + ); + }, ); + // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } private function putBlockAsync(Block $block, StreamInterface|string $content): PromiseInterface @@ -345,11 +319,8 @@ public function copyFromUriAsync(UriInterface $source): PromiseInterface 'headers' => [ 'x-ms-copy-source' => (string) $source, ], - ]) - ->then( - null, - fn(\Throwable $e) => throw $this->exceptionFactory->create($e), - ); + ]); + // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } public function generateSasUri(BlobSasBuilder $blobSasBuilder): UriInterface @@ -389,11 +360,8 @@ public function setTagsAsync(array $tags): PromiseInterface 'comp' => 'tags', ], 'body' => (new BlobTagsBody($tags))->toXml()->asXML(), - ]) - ->then( - null, - fn(\Throwable $e) => throw $this->exceptionFactory->create($e), - ); + ]); + // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } /** @@ -415,7 +383,7 @@ public function getTagsAsync(): PromiseInterface ]) ->then( fn(ResponseInterface $response) => BlobTagsBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents()))->tags, - fn(\Throwable $e) => throw $this->exceptionFactory->create($e), ); + // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } } diff --git a/src/Blob/BlobContainerClient.php b/src/Blob/BlobContainerClient.php index 1efe14c..3abb272 100644 --- a/src/Blob/BlobContainerClient.php +++ b/src/Blob/BlobContainerClient.php @@ -4,7 +4,7 @@ namespace AzureOss\Storage\Blob; -use AzureOss\Storage\Blob\Exceptions\BlobStorageExceptionFactory; +use AzureOss\Storage\Blob\Exceptions\BlobStorageExceptionDeserializer; use AzureOss\Storage\Blob\Exceptions\ContainerAlreadyExistsException; use AzureOss\Storage\Blob\Exceptions\ContainerNotFoundException; use AzureOss\Storage\Blob\Exceptions\InvalidBlobUriException; @@ -23,7 +23,7 @@ use AzureOss\Storage\Common\Middleware\ClientFactory; use AzureOss\Storage\Common\Sas\SasProtocol; use GuzzleHttp\Client; -use GuzzleHttp\Exception\RequestException; +use GuzzleHttp\Promise\PromiseInterface; use GuzzleHttp\Psr7\Uri; use Psr\Http\Message\UriInterface; @@ -31,8 +31,6 @@ final class BlobContainerClient { private readonly Client $client; - private readonly BlobStorageExceptionFactory $exceptionFactory; - public readonly string $containerName; /** @@ -43,8 +41,7 @@ public function __construct( public readonly ?StorageSharedKeyCredential $sharedKeyCredentials = null, ) { $this->containerName = BlobUriParserHelper::getContainerName($uri); - $this->client = (new ClientFactory())->create($uri, $sharedKeyCredentials); - $this->exceptionFactory = new BlobStorageExceptionFactory(); + $this->client = (new ClientFactory())->create($uri, $sharedKeyCredentials, new BlobStorageExceptionDeserializer()); } public function getBlobClient(string $blobName): BlobClient @@ -57,82 +54,105 @@ public function getBlobClient(string $blobName): BlobClient public function create(): void { - try { - $this->client->put($this->uri, [ - 'query' => [ - 'restype' => 'container', - ], - ]); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + $this->createAsync()->wait(); + } + + public function createAsync(): PromiseInterface + { + return $this->client->putAsync($this->uri, [ + 'query' => [ + 'restype' => 'container', + ], + ]); } public function createIfNotExists(): void { - try { - $this->create(); - } catch (ContainerAlreadyExistsException) { - // do nothing - } + $this->createIfNotExistsAsync()->wait(); + } + + public function createIfNotExistsAsync(): PromiseInterface + { + return $this->createAsync() + ->otherwise(function (\Throwable $e) { + if ($e instanceof ContainerAlreadyExistsException) { + return; + } + + throw $e; + }); } public function delete(): void { - try { - $this->client->delete($this->uri, [ - 'query' => [ - 'restype' => 'container', - ], - ]); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + $this->deleteAsync()->wait(); + } + + public function deleteAsync(): PromiseInterface + { + return $this->client->deleteAsync($this->uri, [ + 'query' => [ + 'restype' => 'container', + ], + ]); } public function deleteIfExists(): void { - try { - $this->delete(); - } catch (ContainerNotFoundException $e) { - // do nothing - } + $this->deleteIfExistsAsync()->wait(); + } + + public function deleteIfExistsAsync(): PromiseInterface + { + return $this->deleteAsync() + ->otherwise(function (\Throwable $e) { + if ($e instanceof ContainerNotFoundException) { + return; + } + + throw $e; + }); } public function exists(): bool { - try { - $this->client->head($this->uri, [ + /** @phpstan-ignore-next-line */ + return $this->existsAsync()->wait(); + } + + public function existsAsync(): PromiseInterface + { + return $this->client + ->headAsync($this->uri, [ 'query' => [ 'restype' => 'container', ], - ]); - - return true; - } catch (RequestException $e) { - $e = $this->exceptionFactory->create($e); - - if ($e instanceof ContainerNotFoundException) { - return false; - } + ]) + ->then(fn() => true) + ->otherwise(function (\Throwable $e) { + if ($e instanceof ContainerNotFoundException) { + return false; + } - throw $e; - } + throw $e; + }); } public function getProperties(): BlobContainerProperties { - try { - $response = $this->client->get($this->uri, [ + /** @phpstan-ignore-next-line */ + return $this->getPropertiesAsync()->wait(); + } + + public function getPropertiesAsync(): PromiseInterface + { + return $this->client + ->getAsync($this->uri, [ 'query' => [ 'restype' => 'container', ], - ]); - - return BlobContainerProperties::fromResponseHeaders($response); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + ]) + ->then(BlobContainerProperties::fromResponseHeaders(...)); } /** @@ -140,17 +160,21 @@ public function getProperties(): BlobContainerProperties */ public function setMetadata(array $metadata): void { - try { - $this->client->put($this->uri, [ - 'query' => [ - 'restype' => 'container', - 'comp' => 'metadata', - ], - 'headers' => MetadataHelper::metadataToHeaders($metadata), - ]); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + $this->setMetadataAsync($metadata)->wait(); + } + + /** + * @param array $metadata + */ + public function setMetadataAsync(array $metadata): PromiseInterface + { + return $this->client->putAsync($this->uri, [ + 'query' => [ + 'restype' => 'container', + 'comp' => 'metadata', + ], + 'headers' => MetadataHelper::metadataToHeaders($metadata), + ]); } /** @@ -202,22 +226,18 @@ public function getBlobsByHierarchy(?string $prefix = null, string $delimiter = private function listBlobs(?string $prefix, ?string $delimiter, string $marker, ?int $maxResults): ListBlobsResponseBody { - try { - $response = $this->client->get($this->uri, [ - 'query' => [ - 'restype' => 'container', - 'comp' => 'list', - 'prefix' => $prefix, - 'marker' => $marker !== "" ? $marker : null, - 'delimiter' => $delimiter, - 'maxresults' => $maxResults, - ], - ]); - - return ListBlobsResponseBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents())); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + $response = $this->client->get($this->uri, [ + 'query' => [ + 'restype' => 'container', + 'comp' => 'list', + 'prefix' => $prefix, + 'marker' => $marker !== "" ? $marker : null, + 'delimiter' => $delimiter, + 'maxresults' => $maxResults, + ], + ]); + + return ListBlobsResponseBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents())); } public function canGenerateSasUri(): bool @@ -248,32 +268,28 @@ public function generateSasUri(BlobSasBuilder $blobSasBuilder): UriInterface */ public function findBlobsByTag(string $tagFilterSqlExpression): \Generator { - try { - $nextMarker = ""; - - while (true) { - $response = $this->client->get($this->uri, [ - 'query' => [ - 'restype' => 'container', - 'comp' => 'blobs', - 'where' => $tagFilterSqlExpression, - 'marker' => $nextMarker !== "" ? $nextMarker : null, - ], - ]); - - $body = FindBlobsByTagBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents())); - $nextMarker = $body->nextMarker; - - foreach ($body->blobs as $blob) { - yield $blob; - } + $nextMarker = ""; - if ($nextMarker === "") { - break; - } + while (true) { + $response = $this->client->get($this->uri, [ + 'query' => [ + 'restype' => 'container', + 'comp' => 'blobs', + 'where' => $tagFilterSqlExpression, + 'marker' => $nextMarker !== "" ? $nextMarker : null, + ], + ]); + + $body = FindBlobsByTagBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents())); + $nextMarker = $body->nextMarker; + + foreach ($body->blobs as $blob) { + yield $blob; + } + + if ($nextMarker === "") { + break; } - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); } } } diff --git a/src/Blob/BlobServiceClient.php b/src/Blob/BlobServiceClient.php index 45188d8..1be7fde 100644 --- a/src/Blob/BlobServiceClient.php +++ b/src/Blob/BlobServiceClient.php @@ -4,7 +4,7 @@ namespace AzureOss\Storage\Blob; -use AzureOss\Storage\Blob\Exceptions\BlobStorageExceptionFactory; +use AzureOss\Storage\Blob\Exceptions\BlobStorageExceptionDeserializer; use AzureOss\Storage\Blob\Exceptions\InvalidConnectionStringException; use AzureOss\Storage\Blob\Exceptions\UnableToGenerateSasException; use AzureOss\Storage\Blob\Helpers\BlobUriParserHelper; @@ -19,7 +19,6 @@ use AzureOss\Storage\Common\Sas\AccountSasServices; use AzureOss\Storage\Common\Sas\SasProtocol; use GuzzleHttp\Client; -use GuzzleHttp\Exception\RequestException; use GuzzleHttp\Psr7\Uri; use Psr\Http\Message\UriInterface; @@ -27,16 +26,13 @@ final class BlobServiceClient { private readonly Client $client; - private readonly BlobStorageExceptionFactory $exceptionFactory; - public function __construct( public UriInterface $uri, public readonly ?StorageSharedKeyCredential $sharedKeyCredentials = null, ) { // must always include the forward slash (/) to separate the host name from the path and query portions of the URI. $this->uri = $uri->withPath(rtrim($uri->getPath(), '/') . "/"); - $this->client = (new ClientFactory())->create($this->uri, $sharedKeyCredentials); - $this->exceptionFactory = new BlobStorageExceptionFactory(); + $this->client = (new ClientFactory())->create($this->uri, $sharedKeyCredentials, new BlobStorageExceptionDeserializer()); } public static function fromConnectionString(string $connectionString): self @@ -73,30 +69,26 @@ public function getContainerClient(string $containerName): BlobContainerClient */ public function getBlobContainers(?string $prefix = null): \Generator { - try { - $nextMarker = ""; - - while (true) { - $response = $this->client->get($this->uri, [ - 'query' => [ - 'comp' => 'list', - 'marker' => $nextMarker !== "" ? $nextMarker : null, - 'prefix' => $prefix, - ], - ]); - $body = ListContainersResponseBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents())); - $nextMarker = $body->nextMarker; - - foreach ($body->containers as $container) { - yield $container; - } - - if ($nextMarker === "") { - break; - } + $nextMarker = ""; + + while (true) { + $response = $this->client->get($this->uri, [ + 'query' => [ + 'comp' => 'list', + 'marker' => $nextMarker !== "" ? $nextMarker : null, + 'prefix' => $prefix, + ], + ]); + $body = ListContainersResponseBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents())); + $nextMarker = $body->nextMarker; + + foreach ($body->containers as $container) { + yield $container; + } + + if ($nextMarker === "") { + break; } - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); } } @@ -107,31 +99,27 @@ public function getBlobContainers(?string $prefix = null): \Generator */ public function findBlobsByTag(string $tagFilterSqlExpression): \Generator { - try { - $nextMarker = ""; - - while (true) { - $response = $this->client->get($this->uri, [ - 'query' => [ - 'comp' => 'blobs', - 'where' => $tagFilterSqlExpression, - 'marker' => $nextMarker !== "" ? $nextMarker : null, - ], - ]); - - $body = FindBlobsByTagBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents())); - $nextMarker = $body->nextMarker; - - foreach ($body->blobs as $blob) { - yield $blob; - } - - if ($nextMarker === "") { - break; - } + $nextMarker = ""; + + while (true) { + $response = $this->client->get($this->uri, [ + 'query' => [ + 'comp' => 'blobs', + 'where' => $tagFilterSqlExpression, + 'marker' => $nextMarker !== "" ? $nextMarker : null, + ], + ]); + + $body = FindBlobsByTagBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents())); + $nextMarker = $body->nextMarker; + + foreach ($body->blobs as $blob) { + yield $blob; + } + + if ($nextMarker === "") { + break; } - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); } } diff --git a/src/Blob/Exceptions/BlobStorageExceptionFactory.php b/src/Blob/Exceptions/BlobStorageExceptionDeserializer.php similarity index 87% rename from src/Blob/Exceptions/BlobStorageExceptionFactory.php rename to src/Blob/Exceptions/BlobStorageExceptionDeserializer.php index eef3d22..56327b3 100644 --- a/src/Blob/Exceptions/BlobStorageExceptionFactory.php +++ b/src/Blob/Exceptions/BlobStorageExceptionDeserializer.php @@ -5,20 +5,16 @@ namespace AzureOss\Storage\Blob\Exceptions; use AzureOss\Storage\Blob\Responses\ErrorResponse; +use AzureOss\Storage\Common\Exceptions\RequestExceptionDeserializer; use GuzzleHttp\Exception\RequestException; use Psr\Http\Message\ResponseInterface; /** * @internal */ -final class BlobStorageExceptionFactory +final class BlobStorageExceptionDeserializer implements RequestExceptionDeserializer { - public function create(\Throwable $e): \Throwable - { - return $e instanceof RequestException ? $this->createFromRequestException($e) : $e; - } - - private function createFromRequestException(RequestException $e): \Exception + public function deserialize(RequestException $e): \Exception { $response = $e->getResponse(); if ($response === null) { diff --git a/src/Blob/Helpers/StreamHelper.php b/src/Blob/Helpers/StreamHelper.php new file mode 100644 index 0000000..3f95f44 --- /dev/null +++ b/src/Blob/Helpers/StreamHelper.php @@ -0,0 +1,55 @@ +detach(); + if ($detached === null) { + return $content; + } + + return self::createUploadStreamFromResource($detached, $blockSize); + } + + /** + * @param resource $content + */ + private static function createUploadStreamFromResource($content, int $blockSize): StreamInterface + { + stream_set_chunk_size($content, $blockSize); + + return StreamUtils::streamFor($content); + } +} diff --git a/src/Common/Exceptions/RequestExceptionDeserializer.php b/src/Common/Exceptions/RequestExceptionDeserializer.php new file mode 100644 index 0000000..0da74ae --- /dev/null +++ b/src/Common/Exceptions/RequestExceptionDeserializer.php @@ -0,0 +1,15 @@ +before('http_errors', new DeserializeExceptionMiddleware($exceptionDeserializer)); $handlerStack->push(new AddXMsClientRequestIdMiddleware()); $handlerStack->push(new AddXMsDateHeaderMiddleware()); $handlerStack->push(new AddXMsVersionMiddleware(ApiVersion::LATEST)); diff --git a/src/Common/Middleware/DeserializeExceptionMiddleware.php b/src/Common/Middleware/DeserializeExceptionMiddleware.php new file mode 100644 index 0000000..5802f09 --- /dev/null +++ b/src/Common/Middleware/DeserializeExceptionMiddleware.php @@ -0,0 +1,32 @@ +otherwise(function (\Throwable $e) { + if ($e instanceof RequestException) { + throw $this->exceptionDeserializer->deserialize($e); + } + + throw $e; + }); + }; + } +} diff --git a/tests/Blob/Benchmark/BlobClientBench.php b/tests/Blob/Benchmark/BlobClientBench.php index 1319a04..6af8248 100644 --- a/tests/Blob/Benchmark/BlobClientBench.php +++ b/tests/Blob/Benchmark/BlobClientBench.php @@ -43,8 +43,8 @@ public function provideFiles(): \Generator { yield '20x10KB' => ['path' => FileFactory::create(10_000), 'count' => 100]; yield '10x10MB' => ['path' => FileFactory::create(10_000_000), 'count' => 10]; - yield '5x100MB' => ['path' => FileFactory::create(100_000_000), 'count' => 1]; - yield '2x800MB' => ['path' => FileFactory::create(800_000_000), 'count' => 1]; - yield '1x1.6GB' => ['path' => FileFactory::create(1_600_000_000), 'count' => 1]; + yield '5x100MB' => ['path' => FileFactory::create(100_000_000), 'count' => 5]; + yield '2x1GB' => ['path' => FileFactory::create(1_000_000_000), 'count' => 2]; + yield '1x4GB' => ['path' => FileFactory::create(4_000_000_000), 'count' => 1]; } } diff --git a/tests/Blob/Feature/BlobClientTest.php b/tests/Blob/Feature/BlobClientTest.php index f4c4057..7f65622 100644 --- a/tests/Blob/Feature/BlobClientTest.php +++ b/tests/Blob/Feature/BlobClientTest.php @@ -211,6 +211,11 @@ public function upload_works_with_unknown_sized_stream(): void $stream = new class ($file) implements StreamInterface { use StreamDecoratorTrait; + public function detach() + { + return null; + } + public function getSize(): ?int { return null; @@ -238,7 +243,14 @@ public function getSize(): ?int public function upload_works_with_non_seekable_stream(): void { FileFactory::withStream(1000, function (StreamInterface $file) { - $stream = new NoSeekStream($file); + $stream = new class (new NoSeekStream($file)) implements StreamInterface { + use StreamDecoratorTrait; + + public function detach() + { + return null; + } + }; $beforeUploadContent = $file->getContents(); $file->rewind(); From 37cb1e72adeacfa968c634314fc288646d69b652 Mon Sep 17 00:00:00 2001 From: brecht-vermeersch <20967950+brecht-vermeersch@users.noreply.github.com> Date: Wed, 26 Feb 2025 16:34:38 +0000 Subject: [PATCH 4/5] Fixes coding style --- src/Blob/BlobContainerClient.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Blob/BlobContainerClient.php b/src/Blob/BlobContainerClient.php index 3abb272..d1257d0 100644 --- a/src/Blob/BlobContainerClient.php +++ b/src/Blob/BlobContainerClient.php @@ -117,7 +117,7 @@ public function deleteIfExistsAsync(): PromiseInterface public function exists(): bool { /** @phpstan-ignore-next-line */ - return $this->existsAsync()->wait(); + return $this->existsAsync()->wait(); } public function existsAsync(): PromiseInterface From 73949a23168cb5e2c22a8f124dbb9ea5b1ddb1f3 Mon Sep 17 00:00:00 2001 From: "brecht.vermeersch" Date: Wed, 26 Feb 2025 19:03:34 +0100 Subject: [PATCH 5/5] remove commented out code --- src/Blob/BlobClient.php | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/Blob/BlobClient.php b/src/Blob/BlobClient.php index ec4cbf0..96d32a2 100644 --- a/src/Blob/BlobClient.php +++ b/src/Blob/BlobClient.php @@ -64,7 +64,6 @@ public function downloadStreamingAsync(): PromiseInterface 'stream' => true, ]) ->then(BlobDownloadStreamingResult::fromResponse(...)); - // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } public function getProperties(): BlobProperties @@ -78,7 +77,6 @@ public function getPropertiesAsync(): PromiseInterface return $this->client ->headAsync($this->uri) ->then(BlobProperties::fromResponseHeaders(...)); - // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } /** @@ -101,7 +99,6 @@ public function setMetadataAsync(array $metadata): PromiseInterface ], 'headers' => MetadataHelper::metadataToHeaders($metadata), ]); - // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } public function delete(): void @@ -112,7 +109,6 @@ public function delete(): void public function deleteAsync(): PromiseInterface { return $this->client->deleteAsync($this->uri); - // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } public function deleteIfExists(): void @@ -193,7 +189,6 @@ private function uploadSingleAsync(StreamInterface $content, UploadBlobOptions $ ], 'body' => $content, ]); - // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } private function uploadInSequentialBlocksAsync(StreamInterface $content, UploadBlobOptions $options): PromiseInterface @@ -232,7 +227,6 @@ function () use (&$blocks, &$options, &$contextMD5) { ); }, ); - // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } private function uploadInParallelBlocksAsync(StreamInterface $content, UploadBlobOptions $options): PromiseInterface @@ -271,7 +265,6 @@ function () use (&$content, &$blocks, &$options) { ); }, ); - // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } private function putBlockAsync(Block $block, StreamInterface|string $content): PromiseInterface @@ -320,7 +313,6 @@ public function copyFromUriAsync(UriInterface $source): PromiseInterface 'x-ms-copy-source' => (string) $source, ], ]); - // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } public function generateSasUri(BlobSasBuilder $blobSasBuilder): UriInterface @@ -361,7 +353,6 @@ public function setTagsAsync(array $tags): PromiseInterface ], 'body' => (new BlobTagsBody($tags))->toXml()->asXML(), ]); - // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } /** @@ -384,6 +375,5 @@ public function getTagsAsync(): PromiseInterface ->then( fn(ResponseInterface $response) => BlobTagsBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents()))->tags, ); - // ->otherwise(fn(\Throwable $e) => throw $this->exceptionFactory->create($e)); } }