diff --git a/src/Blob/BlobClient.php b/src/Blob/BlobClient.php index 73ebf59..96d32a2 100644 --- a/src/Blob/BlobClient.php +++ b/src/Blob/BlobClient.php @@ -5,11 +5,12 @@ namespace AzureOss\Storage\Blob; use AzureOss\Storage\Blob\Exceptions\BlobNotFoundException; -use AzureOss\Storage\Blob\Exceptions\BlobStorageExceptionFactory; +use AzureOss\Storage\Blob\Exceptions\BlobStorageExceptionDeserializer; use AzureOss\Storage\Blob\Exceptions\InvalidBlobUriException; use AzureOss\Storage\Blob\Exceptions\UnableToGenerateSasException; use AzureOss\Storage\Blob\Helpers\BlobUriParserHelper; use AzureOss\Storage\Blob\Helpers\MetadataHelper; +use AzureOss\Storage\Blob\Helpers\StreamHelper; use AzureOss\Storage\Blob\Models\BlobDownloadStreamingResult; use AzureOss\Storage\Blob\Models\BlobProperties; use AzureOss\Storage\Blob\Models\UploadBlobOptions; @@ -22,11 +23,11 @@ use AzureOss\Storage\Common\Middleware\ClientFactory; use AzureOss\Storage\Common\Sas\SasProtocol; use GuzzleHttp\Client; -use GuzzleHttp\Exception\RequestException; use GuzzleHttp\Pool; use GuzzleHttp\Promise\PromiseInterface; use GuzzleHttp\Psr7\Uri; use GuzzleHttp\Psr7\Utils as StreamUtils; +use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\StreamInterface; use Psr\Http\Message\UriInterface; @@ -34,8 +35,6 @@ final class BlobClient { private readonly Client $client; - private readonly BlobStorageExceptionFactory $exceptionFactory; - public readonly string $containerName; public readonly string $blobName; @@ -49,82 +48,106 @@ public function __construct( ) { $this->containerName = BlobUriParserHelper::getContainerName($uri); $this->blobName = BlobUriParserHelper::getBlobName($uri); - $this->client = (new ClientFactory())->create($uri, $sharedKeyCredentials); - $this->exceptionFactory = new BlobStorageExceptionFactory(); + $this->client = (new ClientFactory())->create($uri, $sharedKeyCredentials, new BlobStorageExceptionDeserializer()); } public function downloadStreaming(): BlobDownloadStreamingResult { - try { - $response = $this->client->get($this->uri, [ - 'stream' => true, - ]); + /** @phpstan-ignore-next-line */ + return $this->downloadStreamingAsync()->wait(); + } - return new BlobDownloadStreamingResult( - $response->getBody(), - BlobProperties::fromResponseHeaders($response), - ); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + public function downloadStreamingAsync(): PromiseInterface + { + return $this->client + ->getAsync($this->uri, [ + 'stream' => true, + ]) + ->then(BlobDownloadStreamingResult::fromResponse(...)); } public function getProperties(): BlobProperties { - try { - $response = $this->client->head($this->uri); + /** @phpstan-ignore-next-line */ + return $this->getPropertiesAsync()->wait(); + } - return BlobProperties::fromResponseHeaders($response); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + public function getPropertiesAsync(): PromiseInterface + { + return $this->client + ->headAsync($this->uri) + ->then(BlobProperties::fromResponseHeaders(...)); } /** * @param array $metadata - * @return void */ public function setMetadata(array $metadata): void { - try { - $this->client->put($this->uri, [ + $this->setMetadataAsync($metadata)->wait(); + } + + /** + * @param array $metadata + */ + public function setMetadataAsync(array $metadata): PromiseInterface + { + return $this->client + ->putAsync($this->uri, [ 'query' => [ 'comp' => 'metadata', ], 'headers' => MetadataHelper::metadataToHeaders($metadata), ]); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } } public function delete(): void { - try { - $this->client->delete($this->uri); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + $this->deleteAsync()->wait(); + } + + public function deleteAsync(): PromiseInterface + { + return $this->client->deleteAsync($this->uri); } public function deleteIfExists(): void { - try { - $this->delete(); - } catch (BlobNotFoundException) { - // do nothing - } + $this->deleteIfExistsAsync()->wait(); + } + + public function deleteIfExistsAsync(): PromiseInterface + { + return $this->deleteAsync()->otherwise( + function (\Throwable $e) { + if ($e instanceof BlobNotFoundException) { + return null; + } + + throw $e; + }, + ); } public function exists(): bool { - try { - $this->getProperties(); + /** @phpstan-ignore-next-line */ + return $this->existsAsync()->wait(); + } - return true; - } catch (BlobNotFoundException) { - return false; - } + public function existsAsync(): PromiseInterface + { + return $this->getPropertiesAsync() + ->then(fn() => true) + ->otherwise( + function (\Throwable $e) { + if ($e instanceof BlobNotFoundException) { + return false; + } + + throw $e; + }, + ); } /** @@ -132,42 +155,33 @@ public function exists(): bool */ public function upload($content, ?UploadBlobOptions $options = null): void { - if ($options === null) { - $options = new UploadBlobOptions(); - } - - $content = $this->createUploadStream($content, $options); - - if ($content->getSize() === null || ! $content->isSeekable()) { - $this->uploadInSequentialBlocks($content, $options); - } elseif ($content->getSize() > $options->initialTransferSize) { - $this->uploadInParallelBlocks($content, $options); - } else { - $this->uploadSingle($content, $options); - } + $this->uploadAsync($content, $options)->wait(); } /** * @param string|resource|StreamInterface $content */ - private function createUploadStream($content, UploadBlobOptions $options): StreamInterface + public function uploadAsync($content, ?UploadBlobOptions $options = null): PromiseInterface { - if ($content instanceof StreamInterface) { - $content = $content->detach(); + if ($options === null) { + $options = new UploadBlobOptions(); } - // fix network streams only reading 8KB chunks - if (is_resource($content)) { - stream_set_chunk_size($content, $options->maximumTransferSize); - } + $content = StreamHelper::createUploadStream($content, $options->maximumTransferSize); - return StreamUtils::streamFor($content); + if ($content->getSize() === null || ! $content->isSeekable()) { + return $this->uploadInSequentialBlocksAsync($content, $options); + } elseif ($content->getSize() > $options->initialTransferSize) { + return $this->uploadInParallelBlocksAsync($content, $options); + } else { + return $this->uploadSingleAsync($content, $options); + } } - private function uploadSingle(StreamInterface $content, UploadBlobOptions $options): void + private function uploadSingleAsync(StreamInterface $content, UploadBlobOptions $options): PromiseInterface { - try { - $this->client->put($this->uri, [ + return $this->client + ->putAsync($this->uri, [ 'headers' => [ 'x-ms-blob-type' => 'BlockBlob', 'Content-Type' => $options->contentType, @@ -175,50 +189,54 @@ private function uploadSingle(StreamInterface $content, UploadBlobOptions $optio ], 'body' => $content, ]); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } } - private function uploadInSequentialBlocks(StreamInterface $content, UploadBlobOptions $options): void + private function uploadInSequentialBlocksAsync(StreamInterface $content, UploadBlobOptions $options): PromiseInterface { $blocks = []; - $contextMD5 = hash_init('md5'); - while (true) { - $blockContent = $content->read($options->maximumTransferSize); - - if ($blockContent === "") { - break; - } - - $block = new Block(count($blocks), BlockType::UNCOMMITTED); - $blocks[] = $block; + $putBlockRequestGenerator = function () use (&$content, &$options, &$blocks, &$contextMD5): \Generator { + while (true) { + $blockContent = $content->read($options->maximumTransferSize); + if ($blockContent === "") { + break; + } - hash_update($contextMD5, $blockContent); + $block = new Block(count($blocks), BlockType::UNCOMMITTED); + $blocks[] = $block; - $this->putBlockAsync($block, $blockContent)->wait(); - } + hash_update($contextMD5, $blockContent); - $contentMD5 = hash_final($contextMD5, true); + yield fn() => $this->putBlockAsync($block, $blockContent); + } + }; - $this->putBlockList( - $blocks, - $options->contentType, - $contentMD5, - ); + return (new Pool( + $this->client, + $putBlockRequestGenerator(), + ['concurrency' => 1], + )) + ->promise() + ->then( + function () use (&$blocks, &$options, &$contextMD5) { + return $this->putBlockListAsync( + $blocks, + $options->contentType, + hash_final($contextMD5, true), + ); + }, + ); } - private function uploadInParallelBlocks(StreamInterface $content, UploadBlobOptions $options): void + private function uploadInParallelBlocksAsync(StreamInterface $content, UploadBlobOptions $options): PromiseInterface { $blocks = []; - $putBlockRequestGenerator = function () use ($content, $options, &$blocks): \Generator { + $putBlockRequestGenerator = function () use (&$content, &$options, &$blocks): \Generator { while (true) { $blockContent = StreamUtils::streamFor(); StreamUtils::copyToStream($content, $blockContent, $options->maximumTransferSize); - if ($blockContent->getSize() === 0) { break; } @@ -230,20 +248,23 @@ private function uploadInParallelBlocks(StreamInterface $content, UploadBlobOpti } }; - $pool = new Pool($this->client, $putBlockRequestGenerator(), [ - 'concurrency' => $options->maximumConcurrency, - 'rejected' => function (\Exception $e) { - throw $this->exceptionFactory->create($e); - }, - ]); - - $pool->promise()->wait(); - - $this->putBlockList( - $blocks, - $options->contentType, - StreamUtils::hash($content, 'md5', true), + $pool = new Pool( + $this->client, + $putBlockRequestGenerator(), + ['concurrency' => $options->maximumConcurrency], ); + + return $pool + ->promise() + ->then( + function () use (&$content, &$blocks, &$options) { + return $this->putBlockListAsync( + $blocks, + $options->contentType, + StreamUtils::hash($content, 'md5', true), + ); + }, + ); } private function putBlockAsync(Block $block, StreamInterface|string $content): PromiseInterface @@ -264,10 +285,10 @@ private function putBlockAsync(Block $block, StreamInterface|string $content): P /** * @param Block[] $blocks */ - private function putBlockList(array $blocks, ?string $contentType, string $contentMD5): void + private function putBlockListAsync(array $blocks, ?string $contentType, string $contentMD5): PromiseInterface { - try { - $this->client->put($this->uri, [ + return $this->client + ->putAsync($this->uri, [ 'query' => [ 'comp' => 'blocklist', ], @@ -277,22 +298,21 @@ private function putBlockList(array $blocks, ?string $contentType, string $conte ], 'body' => (new PutBlockRequestBody($blocks))->toXml()->asXML(), ]); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } } public function copyFromUri(UriInterface $source): void { - try { - $this->client->put($this->uri, [ + $this->copyFromUriAsync($source)->wait(); + } + + public function copyFromUriAsync(UriInterface $source): PromiseInterface + { + return $this->client + ->putAsync($this->uri, [ 'headers' => [ 'x-ms-copy-source' => (string) $source, ], ]); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } } public function generateSasUri(BlobSasBuilder $blobSasBuilder): UriInterface @@ -315,20 +335,24 @@ public function generateSasUri(BlobSasBuilder $blobSasBuilder): UriInterface /** * @param array $tags - * @return void */ public function setTags(array $tags): void { - try { - $this->client->put($this->uri, [ + $this->setTagsAsync($tags)->wait(); + } + + /** + * @param array $tags + */ + public function setTagsAsync(array $tags): PromiseInterface + { + return $this->client + ->putAsync($this->uri, [ 'query' => [ 'comp' => 'tags', ], 'body' => (new BlobTagsBody($tags))->toXml()->asXML(), ]); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } } /** @@ -336,17 +360,20 @@ public function setTags(array $tags): void */ public function getTags(): array { - try { - $response = $this->client->get($this->uri, [ + /** @phpstan-ignore-next-line */ + return $this->getTagsAsync()->wait(); + } + + public function getTagsAsync(): PromiseInterface + { + return $this->client + ->getAsync($this->uri, [ 'query' => [ 'comp' => 'tags', ], - ]); - - $body = BlobTagsBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents())); - return $body->tags; - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + ]) + ->then( + fn(ResponseInterface $response) => BlobTagsBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents()))->tags, + ); } } diff --git a/src/Blob/BlobContainerClient.php b/src/Blob/BlobContainerClient.php index 1efe14c..d1257d0 100644 --- a/src/Blob/BlobContainerClient.php +++ b/src/Blob/BlobContainerClient.php @@ -4,7 +4,7 @@ namespace AzureOss\Storage\Blob; -use AzureOss\Storage\Blob\Exceptions\BlobStorageExceptionFactory; +use AzureOss\Storage\Blob\Exceptions\BlobStorageExceptionDeserializer; use AzureOss\Storage\Blob\Exceptions\ContainerAlreadyExistsException; use AzureOss\Storage\Blob\Exceptions\ContainerNotFoundException; use AzureOss\Storage\Blob\Exceptions\InvalidBlobUriException; @@ -23,7 +23,7 @@ use AzureOss\Storage\Common\Middleware\ClientFactory; use AzureOss\Storage\Common\Sas\SasProtocol; use GuzzleHttp\Client; -use GuzzleHttp\Exception\RequestException; +use GuzzleHttp\Promise\PromiseInterface; use GuzzleHttp\Psr7\Uri; use Psr\Http\Message\UriInterface; @@ -31,8 +31,6 @@ final class BlobContainerClient { private readonly Client $client; - private readonly BlobStorageExceptionFactory $exceptionFactory; - public readonly string $containerName; /** @@ -43,8 +41,7 @@ public function __construct( public readonly ?StorageSharedKeyCredential $sharedKeyCredentials = null, ) { $this->containerName = BlobUriParserHelper::getContainerName($uri); - $this->client = (new ClientFactory())->create($uri, $sharedKeyCredentials); - $this->exceptionFactory = new BlobStorageExceptionFactory(); + $this->client = (new ClientFactory())->create($uri, $sharedKeyCredentials, new BlobStorageExceptionDeserializer()); } public function getBlobClient(string $blobName): BlobClient @@ -57,82 +54,105 @@ public function getBlobClient(string $blobName): BlobClient public function create(): void { - try { - $this->client->put($this->uri, [ - 'query' => [ - 'restype' => 'container', - ], - ]); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + $this->createAsync()->wait(); + } + + public function createAsync(): PromiseInterface + { + return $this->client->putAsync($this->uri, [ + 'query' => [ + 'restype' => 'container', + ], + ]); } public function createIfNotExists(): void { - try { - $this->create(); - } catch (ContainerAlreadyExistsException) { - // do nothing - } + $this->createIfNotExistsAsync()->wait(); + } + + public function createIfNotExistsAsync(): PromiseInterface + { + return $this->createAsync() + ->otherwise(function (\Throwable $e) { + if ($e instanceof ContainerAlreadyExistsException) { + return; + } + + throw $e; + }); } public function delete(): void { - try { - $this->client->delete($this->uri, [ - 'query' => [ - 'restype' => 'container', - ], - ]); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + $this->deleteAsync()->wait(); + } + + public function deleteAsync(): PromiseInterface + { + return $this->client->deleteAsync($this->uri, [ + 'query' => [ + 'restype' => 'container', + ], + ]); } public function deleteIfExists(): void { - try { - $this->delete(); - } catch (ContainerNotFoundException $e) { - // do nothing - } + $this->deleteIfExistsAsync()->wait(); + } + + public function deleteIfExistsAsync(): PromiseInterface + { + return $this->deleteAsync() + ->otherwise(function (\Throwable $e) { + if ($e instanceof ContainerNotFoundException) { + return; + } + + throw $e; + }); } public function exists(): bool { - try { - $this->client->head($this->uri, [ + /** @phpstan-ignore-next-line */ + return $this->existsAsync()->wait(); + } + + public function existsAsync(): PromiseInterface + { + return $this->client + ->headAsync($this->uri, [ 'query' => [ 'restype' => 'container', ], - ]); - - return true; - } catch (RequestException $e) { - $e = $this->exceptionFactory->create($e); - - if ($e instanceof ContainerNotFoundException) { - return false; - } + ]) + ->then(fn() => true) + ->otherwise(function (\Throwable $e) { + if ($e instanceof ContainerNotFoundException) { + return false; + } - throw $e; - } + throw $e; + }); } public function getProperties(): BlobContainerProperties { - try { - $response = $this->client->get($this->uri, [ + /** @phpstan-ignore-next-line */ + return $this->getPropertiesAsync()->wait(); + } + + public function getPropertiesAsync(): PromiseInterface + { + return $this->client + ->getAsync($this->uri, [ 'query' => [ 'restype' => 'container', ], - ]); - - return BlobContainerProperties::fromResponseHeaders($response); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + ]) + ->then(BlobContainerProperties::fromResponseHeaders(...)); } /** @@ -140,17 +160,21 @@ public function getProperties(): BlobContainerProperties */ public function setMetadata(array $metadata): void { - try { - $this->client->put($this->uri, [ - 'query' => [ - 'restype' => 'container', - 'comp' => 'metadata', - ], - 'headers' => MetadataHelper::metadataToHeaders($metadata), - ]); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + $this->setMetadataAsync($metadata)->wait(); + } + + /** + * @param array $metadata + */ + public function setMetadataAsync(array $metadata): PromiseInterface + { + return $this->client->putAsync($this->uri, [ + 'query' => [ + 'restype' => 'container', + 'comp' => 'metadata', + ], + 'headers' => MetadataHelper::metadataToHeaders($metadata), + ]); } /** @@ -202,22 +226,18 @@ public function getBlobsByHierarchy(?string $prefix = null, string $delimiter = private function listBlobs(?string $prefix, ?string $delimiter, string $marker, ?int $maxResults): ListBlobsResponseBody { - try { - $response = $this->client->get($this->uri, [ - 'query' => [ - 'restype' => 'container', - 'comp' => 'list', - 'prefix' => $prefix, - 'marker' => $marker !== "" ? $marker : null, - 'delimiter' => $delimiter, - 'maxresults' => $maxResults, - ], - ]); - - return ListBlobsResponseBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents())); - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); - } + $response = $this->client->get($this->uri, [ + 'query' => [ + 'restype' => 'container', + 'comp' => 'list', + 'prefix' => $prefix, + 'marker' => $marker !== "" ? $marker : null, + 'delimiter' => $delimiter, + 'maxresults' => $maxResults, + ], + ]); + + return ListBlobsResponseBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents())); } public function canGenerateSasUri(): bool @@ -248,32 +268,28 @@ public function generateSasUri(BlobSasBuilder $blobSasBuilder): UriInterface */ public function findBlobsByTag(string $tagFilterSqlExpression): \Generator { - try { - $nextMarker = ""; - - while (true) { - $response = $this->client->get($this->uri, [ - 'query' => [ - 'restype' => 'container', - 'comp' => 'blobs', - 'where' => $tagFilterSqlExpression, - 'marker' => $nextMarker !== "" ? $nextMarker : null, - ], - ]); - - $body = FindBlobsByTagBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents())); - $nextMarker = $body->nextMarker; - - foreach ($body->blobs as $blob) { - yield $blob; - } + $nextMarker = ""; - if ($nextMarker === "") { - break; - } + while (true) { + $response = $this->client->get($this->uri, [ + 'query' => [ + 'restype' => 'container', + 'comp' => 'blobs', + 'where' => $tagFilterSqlExpression, + 'marker' => $nextMarker !== "" ? $nextMarker : null, + ], + ]); + + $body = FindBlobsByTagBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents())); + $nextMarker = $body->nextMarker; + + foreach ($body->blobs as $blob) { + yield $blob; + } + + if ($nextMarker === "") { + break; } - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); } } } diff --git a/src/Blob/BlobServiceClient.php b/src/Blob/BlobServiceClient.php index 45188d8..1be7fde 100644 --- a/src/Blob/BlobServiceClient.php +++ b/src/Blob/BlobServiceClient.php @@ -4,7 +4,7 @@ namespace AzureOss\Storage\Blob; -use AzureOss\Storage\Blob\Exceptions\BlobStorageExceptionFactory; +use AzureOss\Storage\Blob\Exceptions\BlobStorageExceptionDeserializer; use AzureOss\Storage\Blob\Exceptions\InvalidConnectionStringException; use AzureOss\Storage\Blob\Exceptions\UnableToGenerateSasException; use AzureOss\Storage\Blob\Helpers\BlobUriParserHelper; @@ -19,7 +19,6 @@ use AzureOss\Storage\Common\Sas\AccountSasServices; use AzureOss\Storage\Common\Sas\SasProtocol; use GuzzleHttp\Client; -use GuzzleHttp\Exception\RequestException; use GuzzleHttp\Psr7\Uri; use Psr\Http\Message\UriInterface; @@ -27,16 +26,13 @@ final class BlobServiceClient { private readonly Client $client; - private readonly BlobStorageExceptionFactory $exceptionFactory; - public function __construct( public UriInterface $uri, public readonly ?StorageSharedKeyCredential $sharedKeyCredentials = null, ) { // must always include the forward slash (/) to separate the host name from the path and query portions of the URI. $this->uri = $uri->withPath(rtrim($uri->getPath(), '/') . "/"); - $this->client = (new ClientFactory())->create($this->uri, $sharedKeyCredentials); - $this->exceptionFactory = new BlobStorageExceptionFactory(); + $this->client = (new ClientFactory())->create($this->uri, $sharedKeyCredentials, new BlobStorageExceptionDeserializer()); } public static function fromConnectionString(string $connectionString): self @@ -73,30 +69,26 @@ public function getContainerClient(string $containerName): BlobContainerClient */ public function getBlobContainers(?string $prefix = null): \Generator { - try { - $nextMarker = ""; - - while (true) { - $response = $this->client->get($this->uri, [ - 'query' => [ - 'comp' => 'list', - 'marker' => $nextMarker !== "" ? $nextMarker : null, - 'prefix' => $prefix, - ], - ]); - $body = ListContainersResponseBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents())); - $nextMarker = $body->nextMarker; - - foreach ($body->containers as $container) { - yield $container; - } - - if ($nextMarker === "") { - break; - } + $nextMarker = ""; + + while (true) { + $response = $this->client->get($this->uri, [ + 'query' => [ + 'comp' => 'list', + 'marker' => $nextMarker !== "" ? $nextMarker : null, + 'prefix' => $prefix, + ], + ]); + $body = ListContainersResponseBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents())); + $nextMarker = $body->nextMarker; + + foreach ($body->containers as $container) { + yield $container; + } + + if ($nextMarker === "") { + break; } - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); } } @@ -107,31 +99,27 @@ public function getBlobContainers(?string $prefix = null): \Generator */ public function findBlobsByTag(string $tagFilterSqlExpression): \Generator { - try { - $nextMarker = ""; - - while (true) { - $response = $this->client->get($this->uri, [ - 'query' => [ - 'comp' => 'blobs', - 'where' => $tagFilterSqlExpression, - 'marker' => $nextMarker !== "" ? $nextMarker : null, - ], - ]); - - $body = FindBlobsByTagBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents())); - $nextMarker = $body->nextMarker; - - foreach ($body->blobs as $blob) { - yield $blob; - } - - if ($nextMarker === "") { - break; - } + $nextMarker = ""; + + while (true) { + $response = $this->client->get($this->uri, [ + 'query' => [ + 'comp' => 'blobs', + 'where' => $tagFilterSqlExpression, + 'marker' => $nextMarker !== "" ? $nextMarker : null, + ], + ]); + + $body = FindBlobsByTagBody::fromXml(new \SimpleXMLElement($response->getBody()->getContents())); + $nextMarker = $body->nextMarker; + + foreach ($body->blobs as $blob) { + yield $blob; + } + + if ($nextMarker === "") { + break; } - } catch (RequestException $e) { - throw $this->exceptionFactory->create($e); } } diff --git a/src/Blob/Exceptions/BlobStorageExceptionFactory.php b/src/Blob/Exceptions/BlobStorageExceptionDeserializer.php similarity index 87% rename from src/Blob/Exceptions/BlobStorageExceptionFactory.php rename to src/Blob/Exceptions/BlobStorageExceptionDeserializer.php index eef3d22..56327b3 100644 --- a/src/Blob/Exceptions/BlobStorageExceptionFactory.php +++ b/src/Blob/Exceptions/BlobStorageExceptionDeserializer.php @@ -5,20 +5,16 @@ namespace AzureOss\Storage\Blob\Exceptions; use AzureOss\Storage\Blob\Responses\ErrorResponse; +use AzureOss\Storage\Common\Exceptions\RequestExceptionDeserializer; use GuzzleHttp\Exception\RequestException; use Psr\Http\Message\ResponseInterface; /** * @internal */ -final class BlobStorageExceptionFactory +final class BlobStorageExceptionDeserializer implements RequestExceptionDeserializer { - public function create(\Throwable $e): \Throwable - { - return $e instanceof RequestException ? $this->createFromRequestException($e) : $e; - } - - private function createFromRequestException(RequestException $e): \Exception + public function deserialize(RequestException $e): \Exception { $response = $e->getResponse(); if ($response === null) { diff --git a/src/Blob/Helpers/StreamHelper.php b/src/Blob/Helpers/StreamHelper.php new file mode 100644 index 0000000..3f95f44 --- /dev/null +++ b/src/Blob/Helpers/StreamHelper.php @@ -0,0 +1,55 @@ +detach(); + if ($detached === null) { + return $content; + } + + return self::createUploadStreamFromResource($detached, $blockSize); + } + + /** + * @param resource $content + */ + private static function createUploadStreamFromResource($content, int $blockSize): StreamInterface + { + stream_set_chunk_size($content, $blockSize); + + return StreamUtils::streamFor($content); + } +} diff --git a/src/Blob/Models/BlobDownloadStreamingResult.php b/src/Blob/Models/BlobDownloadStreamingResult.php index bf8d990..c9dc664 100644 --- a/src/Blob/Models/BlobDownloadStreamingResult.php +++ b/src/Blob/Models/BlobDownloadStreamingResult.php @@ -4,6 +4,7 @@ namespace AzureOss\Storage\Blob\Models; +use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\StreamInterface; final class BlobDownloadStreamingResult @@ -12,4 +13,12 @@ public function __construct( public readonly StreamInterface $content, public readonly BlobProperties $properties, ) {} + + public static function fromResponse(ResponseInterface $response): self + { + return new self( + $response->getBody(), + BlobProperties::fromResponseHeaders($response), + ); + } } diff --git a/src/Common/Exceptions/RequestExceptionDeserializer.php b/src/Common/Exceptions/RequestExceptionDeserializer.php new file mode 100644 index 0000000..0da74ae --- /dev/null +++ b/src/Common/Exceptions/RequestExceptionDeserializer.php @@ -0,0 +1,15 @@ +before('http_errors', new DeserializeExceptionMiddleware($exceptionDeserializer)); $handlerStack->push(new AddXMsClientRequestIdMiddleware()); $handlerStack->push(new AddXMsDateHeaderMiddleware()); $handlerStack->push(new AddXMsVersionMiddleware(ApiVersion::LATEST)); diff --git a/src/Common/Middleware/DeserializeExceptionMiddleware.php b/src/Common/Middleware/DeserializeExceptionMiddleware.php new file mode 100644 index 0000000..5802f09 --- /dev/null +++ b/src/Common/Middleware/DeserializeExceptionMiddleware.php @@ -0,0 +1,32 @@ +otherwise(function (\Throwable $e) { + if ($e instanceof RequestException) { + throw $this->exceptionDeserializer->deserialize($e); + } + + throw $e; + }); + }; + } +} diff --git a/tests/Blob/Benchmark/BlobClientBench.php b/tests/Blob/Benchmark/BlobClientBench.php index 1319a04..6af8248 100644 --- a/tests/Blob/Benchmark/BlobClientBench.php +++ b/tests/Blob/Benchmark/BlobClientBench.php @@ -43,8 +43,8 @@ public function provideFiles(): \Generator { yield '20x10KB' => ['path' => FileFactory::create(10_000), 'count' => 100]; yield '10x10MB' => ['path' => FileFactory::create(10_000_000), 'count' => 10]; - yield '5x100MB' => ['path' => FileFactory::create(100_000_000), 'count' => 1]; - yield '2x800MB' => ['path' => FileFactory::create(800_000_000), 'count' => 1]; - yield '1x1.6GB' => ['path' => FileFactory::create(1_600_000_000), 'count' => 1]; + yield '5x100MB' => ['path' => FileFactory::create(100_000_000), 'count' => 5]; + yield '2x1GB' => ['path' => FileFactory::create(1_000_000_000), 'count' => 2]; + yield '1x4GB' => ['path' => FileFactory::create(4_000_000_000), 'count' => 1]; } } diff --git a/tests/Blob/Feature/BlobClientTest.php b/tests/Blob/Feature/BlobClientTest.php index f4c4057..7f65622 100644 --- a/tests/Blob/Feature/BlobClientTest.php +++ b/tests/Blob/Feature/BlobClientTest.php @@ -211,6 +211,11 @@ public function upload_works_with_unknown_sized_stream(): void $stream = new class ($file) implements StreamInterface { use StreamDecoratorTrait; + public function detach() + { + return null; + } + public function getSize(): ?int { return null; @@ -238,7 +243,14 @@ public function getSize(): ?int public function upload_works_with_non_seekable_stream(): void { FileFactory::withStream(1000, function (StreamInterface $file) { - $stream = new NoSeekStream($file); + $stream = new class (new NoSeekStream($file)) implements StreamInterface { + use StreamDecoratorTrait; + + public function detach() + { + return null; + } + }; $beforeUploadContent = $file->getContents(); $file->rewind();