diff --git a/.gitignore b/.gitignore index fcafa91..4c52107 100644 --- a/.gitignore +++ b/.gitignore @@ -45,4 +45,5 @@ build/ /main.py /pyproject.toml /uv.lock -**/__pycache__ \ No newline at end of file +**/__pycache__ +/dependency-reduced-pom.xml diff --git a/pom.xml b/pom.xml index 3df9b78..ec7c2ba 100644 --- a/pom.xml +++ b/pom.xml @@ -121,6 +121,42 @@ 4.13.1 test + + org.apache.commons + commons-compress + 1.28.0 + + + com.adobe.testing + s3mock + 2.17.0 + test + + + com.adobe.testing + s3mock-testcontainers + 2.17.0 + test + + + org.testcontainers + junit-jupiter + 1.20.4 + test + + + org.testcontainers + testcontainers + 1.20.4 + test + + + + ch.qos.logback + logback-classic + 1.4.14 + test + @@ -139,6 +175,10 @@ 3.2.5 false + + + 1.44 + diff --git a/src/main/java/dev/zarr/zarrjava/core/Array.java b/src/main/java/dev/zarr/zarrjava/core/Array.java index a8efaeb..a08f62b 100644 --- a/src/main/java/dev/zarr/zarrjava/core/Array.java +++ b/src/main/java/dev/zarr/zarrjava/core/Array.java @@ -3,6 +3,7 @@ import dev.zarr.zarrjava.ZarrException; import dev.zarr.zarrjava.core.codec.CodecPipeline; import dev.zarr.zarrjava.store.FilesystemStore; +import dev.zarr.zarrjava.store.Store; import dev.zarr.zarrjava.store.StoreHandle; import dev.zarr.zarrjava.utils.IndexingUtils; import dev.zarr.zarrjava.utils.MultiArrayUtils; @@ -16,6 +17,9 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.Stream; public abstract class Array extends AbstractNode { @@ -289,6 +293,15 @@ public ucar.ma2.Array read(final long[] offset, final int[] shape, final boolean if (parallel) { chunkStream = chunkStream.parallel(); } + + boolean isListableStore = storeHandle.store instanceof Store.ListableStore; + Set> existingKeys; + if (isListableStore) { + existingKeys = storeHandle.list().map(Arrays::asList).collect(Collectors.toSet()); + } else { + existingKeys = null; + } + chunkStream.forEach( chunkCoords -> { try { @@ -306,9 +319,13 @@ public ucar.ma2.Array read(final long[] offset, final int[] shape, final boolean final String[] chunkKeys = metadata.chunkKeyEncoding().encodeChunkKey(chunkCoords); final StoreHandle chunkHandle = storeHandle.resolve(chunkKeys); - if (!chunkHandle.exists()) { - return; - } + + // chunkHandle.exists() can be expensive on some store types, so we optimize for ListableStore + if (isListableStore) { + if (existingKeys.stream().noneMatch(Arrays.asList(chunkKeys)::equals)) + return; + } else if (!chunkHandle.exists()) return; + if (codecPipeline.supportsPartialDecode()) { final ucar.ma2.Array chunkArray = codecPipeline.decodePartial(chunkHandle, Utils.toLongArray(chunkProjection.chunkOffset), chunkProjection.shape); diff --git a/src/main/java/dev/zarr/zarrjava/core/Group.java b/src/main/java/dev/zarr/zarrjava/core/Group.java index c89617a..7b425a3 100644 --- a/src/main/java/dev/zarr/zarrjava/core/Group.java +++ b/src/main/java/dev/zarr/zarrjava/core/Group.java @@ -9,7 +9,6 @@ import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Objects; import java.util.stream.Stream; public abstract class Group extends AbstractNode { @@ -64,20 +63,15 @@ public static Group open(String path) throws IOException, ZarrException { } @Nullable - public abstract Node get(String key) throws ZarrException, IOException; + public abstract Node get(String[] key) throws ZarrException, IOException; - public Stream list() { - return storeHandle.list() - .map(key -> { - try { - return get(key); - } catch (Exception e) { - throw new RuntimeException(e); - } - }) - .filter(Objects::nonNull); + @Nullable + public Node get(String key) throws ZarrException, IOException { + return get(new String[]{key}); } + public abstract Stream list(); + public Node[] listAsArray() { try (Stream nodeStream = list()) { return nodeStream.toArray(Node[]::new); diff --git a/src/main/java/dev/zarr/zarrjava/store/BufferedZipStore.java b/src/main/java/dev/zarr/zarrjava/store/BufferedZipStore.java new file mode 100644 index 0000000..934da19 --- /dev/null +++ b/src/main/java/dev/zarr/zarrjava/store/BufferedZipStore.java @@ -0,0 +1,314 @@ +package dev.zarr.zarrjava.store; + +import org.apache.commons.compress.archivers.zip.Zip64Mode; +import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; +import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream; +import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; +import java.util.stream.Stream; +import java.util.zip.CRC32; +import java.util.zip.ZipEntry; + + +/** + * A Store implementation that buffers reads and writes and flushes them to an underlying Store as a zip file. + */ +public class BufferedZipStore extends ZipStore { + + private final Store.ListableStore bufferStore; + private final boolean flushOnWrite; + private final Comparator zipEntryComparator = (a, b) -> { + boolean aIsZarr = a.length > 0 && a[a.length - 1].equals("zarr.json"); + boolean bIsZarr = b.length > 0 && b[b.length - 1].equals("zarr.json"); + // first all zarr.json files + if (aIsZarr && !bIsZarr) { + return -1; + } else if (!aIsZarr && bIsZarr) { + return 1; + } else if (aIsZarr && bIsZarr) { + // sort zarr.json in BFS order within same depth by lexicographical order + if (a.length != b.length) { + return Integer.compare(a.length, b.length); + } else { + return String.join("/", a).compareTo(String.join("/", b)); + } + } else { + // then all other files in lexicographical order + return String.join("/", a).compareTo(String.join("/", b)); + } + }; + private String archiveComment; + + public BufferedZipStore(@Nonnull StoreHandle underlyingStore, @Nonnull Store.ListableStore bufferStore, @Nullable String archiveComment, boolean flushOnWrite) { + super(underlyingStore); + this.bufferStore = bufferStore; + this.archiveComment = archiveComment; + this.flushOnWrite = flushOnWrite; + try { + loadBuffer(); + } catch (IOException e) { + throw new RuntimeException("Failed to load buffer from underlying store", e); + } + } + + public BufferedZipStore(@Nonnull StoreHandle underlyingStore, @Nonnull Store.ListableStore bufferStore, @Nullable String archiveComment) { + this(underlyingStore, bufferStore, archiveComment, false); + } + + public BufferedZipStore(@Nonnull StoreHandle underlyingStore, @Nonnull Store.ListableStore bufferStore) { + this(underlyingStore, bufferStore, null); + } + + public BufferedZipStore(@Nonnull StoreHandle underlyingStore, String archiveComment) { + this(underlyingStore, new MemoryStore(), archiveComment); + } + + public BufferedZipStore(@Nonnull StoreHandle underlyingStore) { + this(underlyingStore, (String) null); + } + + public BufferedZipStore(@Nonnull Path underlyingStore, String archiveComment) { + this(new FilesystemStore(underlyingStore.getParent()).resolve(underlyingStore.getFileName().toString()), archiveComment); + } + + public BufferedZipStore(@Nonnull Path underlyingStore) { + this(underlyingStore, null); + } + + public BufferedZipStore(@Nonnull String underlyingStorePath, String archiveComment) { + this(Paths.get(underlyingStorePath), archiveComment); + } + + public BufferedZipStore(@Nonnull String underlyingStorePath) { + this(underlyingStorePath, null); + } + + public BufferedZipStore(@Nonnull StoreHandle underlyingStore, @Nonnull Store.ListableStore bufferStore, boolean flushOnWrite) { + this(underlyingStore, bufferStore, null, flushOnWrite); + } + + public BufferedZipStore(@Nonnull StoreHandle underlyingStore, String archiveComment, boolean flushOnWrite) { + this(underlyingStore, new MemoryStore(), archiveComment, flushOnWrite); + } + + public BufferedZipStore(@Nonnull StoreHandle underlyingStore, boolean flushOnWrite) { + this(underlyingStore, (String) null, flushOnWrite); + } + + public BufferedZipStore(@Nonnull Path underlyingStore, String archiveComment, boolean flushOnWrite) { + this(new FilesystemStore(underlyingStore.getParent()).resolve(underlyingStore.getFileName().toString()), archiveComment, flushOnWrite); + } + + public BufferedZipStore(@Nonnull Path underlyingStore, boolean flushOnWrite) { + this(underlyingStore, null, flushOnWrite); + } + + public BufferedZipStore(@Nonnull String underlyingStorePath, String archiveComment, boolean flushOnWrite) { + this(Paths.get(underlyingStorePath), archiveComment, flushOnWrite); + } + + public BufferedZipStore(@Nonnull String underlyingStorePath, boolean flushOnWrite) { + this(underlyingStorePath, null, flushOnWrite); + } + + private void writeBuffer() throws IOException { + // create zip file bytes from buffer store and write to underlying store + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ZipArchiveOutputStream zos = new ZipArchiveOutputStream(baos)) { + zos.setUseZip64(Zip64Mode.AsNeeded); + if (archiveComment != null) { + zos.setComment(archiveComment); + } + bufferStore.list().sorted(zipEntryComparator).forEach(keys -> { + try { + if (keys == null || keys.length == 0) { + // skip root entry + return; + } + String entryName = String.join("/", keys); + ByteBuffer bb = bufferStore.get(keys); + if (bb == null) { + // directory entry: ensure trailing slash + if (!entryName.endsWith("/")) { + entryName = entryName + "/"; + } + ZipArchiveEntry dirEntry = new ZipArchiveEntry(entryName); + dirEntry.setMethod(ZipEntry.STORED); + dirEntry.setSize(0); + dirEntry.setCrc(0); + zos.putArchiveEntry(dirEntry); + zos.closeArchiveEntry(); + } else { + // read bytes from ByteBuffer without modifying original + ByteBuffer dup = bb.duplicate(); + int len = dup.remaining(); + byte[] bytes = new byte[len]; + dup.get(bytes); + + // compute CRC and set size for STORED (no compression) + CRC32 crc = new CRC32(); + crc.update(bytes, 0, bytes.length); + ZipArchiveEntry fileEntry = new ZipArchiveEntry(entryName); + fileEntry.setMethod(ZipEntry.STORED); + fileEntry.setSize(bytes.length); + fileEntry.setCrc(crc.getValue()); + + zos.putArchiveEntry(fileEntry); + zos.write(bytes); + zos.closeArchiveEntry(); + } + } catch (IOException e) { + // wrap checked exception so it can be rethrown from stream for handling below + throw new RuntimeException(e); + } + }); + zos.finish(); + } catch (RuntimeException e) { + // unwrap and rethrow IOExceptions thrown inside the lambda + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw e; + } + + byte[] zipBytes = baos.toByteArray(); + // write zip bytes back to underlying store + underlyingStore.set(ByteBuffer.wrap(zipBytes)); + } + + public void deleteArchiveComment() throws IOException { + this.setArchiveComment(null); + } + + /** + * Loads the buffer from the underlying store zip file. + */ + private void loadBuffer() throws IOException { + String loadedArchiveComment = super.getArchiveComment(); + if (loadedArchiveComment != null && this.archiveComment == null) { + // don't overwrite existing archiveComment + this.archiveComment = loadedArchiveComment; + } + + InputStream inputStream = underlyingStore.getInputStream(); + if (inputStream == null) { + return; + } + try (ZipArchiveInputStream zis = new ZipArchiveInputStream(inputStream)) { + ZipArchiveEntry entry; + while ((entry = zis.getNextEntry()) != null) { + if (entry.isDirectory()) { + continue; + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] tmp = new byte[8192]; + int read; + while ((read = zis.read(tmp)) != -1) { + baos.write(tmp, 0, read); + } + byte[] bytes = baos.toByteArray(); + bufferStore.set(new String[]{entry.getName()}, ByteBuffer.wrap(bytes)); + } + } + } + + /** + * Flushes the buffer and archiveComment to the underlying store as a zip file. + */ + public void flush() throws IOException { + writeBuffer(); + } + + @Override + public String getArchiveComment() { + return archiveComment; + } + + public void setArchiveComment(@Nullable String archiveComment) throws IOException { + this.archiveComment = archiveComment; + if (flushOnWrite) { + writeBuffer(); + } + } + + @Override + public Stream list(String[] keys) { + return bufferStore.list(keys); + } + + @Override + public boolean exists(String[] keys) { + return bufferStore.exists(keys); + } + + @Nullable + @Override + public ByteBuffer get(String[] keys) { + return bufferStore.get(keys); + } + + @Nullable + @Override + public ByteBuffer get(String[] keys, long start) { + return bufferStore.get(keys, start); + } + + @Nullable + @Override + public ByteBuffer get(String[] keys, long start, long end) { + return bufferStore.get(keys, start, end); + } + + @Override + public void set(String[] keys, ByteBuffer bytes) { + bufferStore.set(keys, bytes); + if (flushOnWrite) { + try { + writeBuffer(); + } catch (IOException e) { + throw new RuntimeException("Failed to flush buffer to underlying store after set operation", e); + } + } + } + + @Override + public void delete(String[] keys) { + bufferStore.delete(keys); + if (flushOnWrite) { + try { + writeBuffer(); + } catch (IOException e) { + throw new RuntimeException("Failed to flush buffer to underlying store after delete operation", e); + } + } + } + + @Nonnull + @Override + public StoreHandle resolve(String... keys) { + return new StoreHandle(this, keys); + } + + @Override + public InputStream getInputStream(String[] keys, long start, long end) { + return bufferStore.getInputStream(keys, start, end); + } + + public long getSize(String[] keys) { + return bufferStore.getSize(keys); + } + + @Override + public String toString() { + return "BufferedZipStore(" + underlyingStore.toString() + ")"; + } +} diff --git a/src/main/java/dev/zarr/zarrjava/store/FilesystemStore.java b/src/main/java/dev/zarr/zarrjava/store/FilesystemStore.java index 3d2e447..90da970 100644 --- a/src/main/java/dev/zarr/zarrjava/store/FilesystemStore.java +++ b/src/main/java/dev/zarr/zarrjava/store/FilesystemStore.java @@ -1,10 +1,12 @@ package dev.zarr.zarrjava.store; import dev.zarr.zarrjava.utils.Utils; +import org.apache.commons.io.input.BoundedInputStream; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.SeekableByteChannel; import java.nio.file.*; @@ -118,9 +120,19 @@ public void delete(String[] keys) { } } - public Stream list(String[] keys) { + public Stream list(String[] keys) { + Path keyPath = resolveKeys(keys); try { - return Files.list(resolveKeys(keys)).map(p -> p.toFile().getName()); + return Files.walk(keyPath) + .filter(path -> !path.equals(keyPath)) + .map(path -> { + Path relativePath = keyPath.relativize(path); + String[] parts = new String[relativePath.getNameCount()]; + for (int i = 0; i < relativePath.getNameCount(); i++) { + parts[i] = relativePath.getName(i).toString(); + } + return parts; + }); } catch (IOException e) { throw new RuntimeException(e); } @@ -137,4 +149,38 @@ public String toString() { return this.path.toUri().toString().replaceAll("\\/$", ""); } + @Override + public InputStream getInputStream(String[] keys, long start, long end) { + Path keyPath = resolveKeys(keys); + try { + if (!Files.exists(keyPath)) { + return null; + } + InputStream inputStream = Files.newInputStream(keyPath); + if (start > 0) { + long skipped = inputStream.skip(start); + if (skipped < start) { + throw new IOException("Unable to skip to the desired start position."); + } + } + if (end != -1) { + long bytesToRead = end - start; + return new BoundedInputStream(inputStream, bytesToRead); + } else { + return inputStream; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public long getSize(String[] keys) { + try { + return Files.size(resolveKeys(keys)); + } catch (NoSuchFileException e) { + return -1; + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/src/main/java/dev/zarr/zarrjava/store/HttpStore.java b/src/main/java/dev/zarr/zarrjava/store/HttpStore.java index cb7ddc2..31d5a0a 100644 --- a/src/main/java/dev/zarr/zarrjava/store/HttpStore.java +++ b/src/main/java/dev/zarr/zarrjava/store/HttpStore.java @@ -4,7 +4,9 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.FilterInputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; public class HttpStore implements Store { @@ -100,4 +102,59 @@ public StoreHandle resolve(String... keys) { public String toString() { return uri; } + + @Override + @Nullable + public InputStream getInputStream(String[] keys, long start, long end) { + if (start < 0) { + throw new IllegalArgumentException("Argument 'start' needs to be non-negative."); + } + Request request = new Request.Builder().url(resolveKeys(keys)).header( + "Range", String.format("Bytes=%d-%d", start, end - 1)).build(); + Call call = httpClient.newCall(request); + try { + Response response = call.execute(); + ResponseBody body = response.body(); + if (body == null) return null; + InputStream stream = body.byteStream(); + + // Ensure closing the stream also closes the response + return new FilterInputStream(stream) { + @Override + public void close() throws IOException { + super.close(); + body.close(); + } + }; + } catch (IOException e) { + return null; + } + } + + @Override + public long getSize(String[] keys) { + // Explicitly request "identity" encoding to prevent OkHttp from adding "gzip" + // and subsequently stripping the Content-Length header. + Request request = new Request.Builder() + .head() + .url(resolveKeys(keys)) + .header("Accept-Encoding", "identity") + .build(); + + Call call = httpClient.newCall(request); + try { + Response response = call.execute(); + if (!response.isSuccessful()) { + return -1; + } + + String contentLength = response.header("Content-Length"); + if (contentLength != null) { + return Long.parseLong(contentLength); + } + return -1; + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/src/main/java/dev/zarr/zarrjava/store/MemoryStore.java b/src/main/java/dev/zarr/zarrjava/store/MemoryStore.java index 42ac6ff..ea855c0 100644 --- a/src/main/java/dev/zarr/zarrjava/store/MemoryStore.java +++ b/src/main/java/dev/zarr/zarrjava/store/MemoryStore.java @@ -2,6 +2,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -45,7 +46,7 @@ public ByteBuffer get(String[] keys, long start, long end) { if (bytes == null) return null; if (end < 0) end = bytes.length; if (end > Integer.MAX_VALUE) throw new IllegalArgumentException("End index too large"); - return ByteBuffer.wrap(bytes, (int) start, (int) end); + return ByteBuffer.wrap(bytes, (int) start, (int) (end - start)); } @@ -59,19 +60,18 @@ public void delete(String[] keys) { map.remove(resolveKeys(keys)); } - public Stream list(String[] keys) { + public Stream list(String[] keys) { List prefix = resolveKeys(keys); - Set allKeys = new HashSet<>(); + Set> allKeys = new HashSet<>(); for (List k : map.keySet()) { if (k.size() <= prefix.size() || !k.subList(0, prefix.size()).equals(prefix)) continue; - for (int i = 0; i < k.size(); i++) { - List subKey = k.subList(0, i + 1); - allKeys.add(String.join("/", subKey)); + for (int i = prefix.size(); i < k.size(); i++) { + allKeys.add(k.subList(prefix.size(), i + 1)); } } - return allKeys.stream(); + return allKeys.stream().map(k -> k.toArray(new String[0])); } @Nonnull @@ -84,5 +84,22 @@ public StoreHandle resolve(String... keys) { public String toString() { return String.format("", hashCode()); } -} + @Override + public InputStream getInputStream(String[] keys, long start, long end) { + byte[] bytes = map.get(resolveKeys(keys)); + if (bytes == null) return null; + if (end < 0) end = bytes.length; + if (end > Integer.MAX_VALUE) throw new IllegalArgumentException("End index too large"); + return new java.io.ByteArrayInputStream(bytes, (int) start, (int) (end - start)); + } + + @Override + public long getSize(String[] keys) { + byte[] bytes = map.get(resolveKeys(keys)); + if (bytes == null) { + return -1; + } + return bytes.length; + } +} diff --git a/src/main/java/dev/zarr/zarrjava/store/ReadOnlyZipStore.java b/src/main/java/dev/zarr/zarrjava/store/ReadOnlyZipStore.java new file mode 100644 index 0000000..8b906fd --- /dev/null +++ b/src/main/java/dev/zarr/zarrjava/store/ReadOnlyZipStore.java @@ -0,0 +1,225 @@ +package dev.zarr.zarrjava.store; + +import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; +import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream; +import org.apache.commons.io.input.BoundedInputStream; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.stream.Stream; + + +/** + * A Store implementation that provides read-only access to a zip archive stored in an underlying Store. + * Compared to BufferedZipStore, this implementation reads directly from the zip archive without parsing + * its contents into a buffer store first making it more efficient for read-only access to large zip archives. + */ +public class ReadOnlyZipStore extends ZipStore { + + public ReadOnlyZipStore(@Nonnull StoreHandle underlyingStore) { + super(underlyingStore); + } + + public ReadOnlyZipStore(@Nonnull Path underlyingStore) { + this(new FilesystemStore(underlyingStore.getParent()).resolve(underlyingStore.getFileName().toString())); + } + + public ReadOnlyZipStore(@Nonnull String underlyingStorePath) { + this(Paths.get(underlyingStorePath)); + } + + String resolveKeys(String[] keys) { + return String.join("/", keys); + } + + String[] resolveEntryKeys(String entryKey) { + return entryKey.split("/"); + } + + @Override + public boolean exists(String[] keys) { + return get(keys, 0, 0) != null; + } + + @Nullable + @Override + public ByteBuffer get(String[] keys) { + return get(keys, 0); + } + + @Nullable + @Override + public ByteBuffer get(String[] keys, long start) { + return get(keys, start, -1); + } + + @Nullable + @Override + public ByteBuffer get(String[] keys, long start, long end) { + InputStream inputStream = underlyingStore.getInputStream(); + if (inputStream == null) { + return null; + } + try (ZipArchiveInputStream zis = new ZipArchiveInputStream(inputStream)) { + ZipArchiveEntry entry; + while ((entry = zis.getNextEntry()) != null) { + String entryName = entry.getName(); + + if (entryName.startsWith("/")) { + entryName = entryName.substring(1); + } + if (entry.isDirectory() || !entryName.equals(resolveKeys(keys))) { + continue; + } + + long skipResult = zis.skip(start); + if (skipResult != start) { + throw new IOException("Failed to skip to start position " + start + " in zip entry " + entryName); + } + + long bytesToRead; + if (end != -1) bytesToRead = end - start; + else bytesToRead = Long.MAX_VALUE; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] bufferArray = new byte[8192]; + int len; + while (bytesToRead > 0 && (len = zis.read(bufferArray, 0, (int) Math.min(bufferArray.length, bytesToRead))) != -1) { + baos.write(bufferArray, 0, len); + bytesToRead -= len; + } + byte[] bytes = baos.toByteArray(); + return ByteBuffer.wrap(bytes); + } + } catch (IOException e) { + return null; + } + return null; + } + + @Override + public void set(String[] keys, ByteBuffer bytes) { + throw new UnsupportedOperationException("ReadOnlyZipStore does not support set operation."); + } + + @Override + public void delete(String[] keys) { + throw new UnsupportedOperationException("ReadOnlyZipStore does not support delete operation."); + } + + @Nonnull + @Override + public StoreHandle resolve(String... keys) { + return new StoreHandle(this, keys); + } + + @Override + public String toString() { + return "ReadOnlyZipStore(" + underlyingStore.toString() + ")"; + } + + @Override + public Stream list(String[] keys) { + Stream.Builder builder = Stream.builder(); + + InputStream inputStream = underlyingStore.getInputStream(); + if (inputStream == null) { + return builder.build(); + } + try (ZipArchiveInputStream zis = new ZipArchiveInputStream(inputStream)) { + ZipArchiveEntry entry; + String prefix = resolveKeys(keys); + while ((entry = zis.getNextEntry()) != null) { + String entryName = entry.getName(); + if (entryName.startsWith("/")) { + entryName = entryName.substring(1); + } + if (entryName.endsWith("/")) { + entryName = entryName.substring(0, entryName.length() - 1); + } + if (!entryName.startsWith(prefix) || entryName.equals(prefix)) { + continue; + } + String[] entryKeys = resolveEntryKeys(entryName.substring(prefix.length() + 1)); + builder.add(entryKeys); + } + } catch (IOException ignored) { + } + return builder.build(); + } + + @Override + public InputStream getInputStream(String[] keys, long start, long end) { + InputStream baseStream = underlyingStore.getInputStream(); + + try { + ZipArchiveInputStream zis = new ZipArchiveInputStream(baseStream); + ZipArchiveEntry entry; + while ((entry = zis.getNextEntry()) != null) { + String entryName = entry.getName(); + + if (entryName.startsWith("/")) { + entryName = entryName.substring(1); + } + if (entry.isDirectory() || !entryName.equals(resolveKeys(keys))) { + continue; + } + + long skipResult = zis.skip(start); + if (skipResult != start) { + throw new IOException("Failed to skip to start position " + start + " in zip entry " + entryName); + } + + long bytesToRead; + if (end != -1) bytesToRead = end - start; + else bytesToRead = Long.MAX_VALUE; + + return new BoundedInputStream(zis, bytesToRead); + } + return null; + } catch (IOException ignored) { + } + return null; + } + + @Override + public long getSize(String[] keys) { + InputStream inputStream = underlyingStore.getInputStream(); + if (inputStream == null) { + throw new RuntimeException(new IOException("Underlying store input stream is null")); + } + try (ZipArchiveInputStream zis = new ZipArchiveInputStream(inputStream)) { + ZipArchiveEntry entry; + while ((entry = zis.getNextEntry()) != null) { + String entryName = entry.getName(); + + if (entryName.startsWith("/")) { + entryName = entryName.substring(1); + } + if (entry.isDirectory() || !entryName.equals(resolveKeys(keys))) { + continue; + } + long size = entry.getSize(); + if (size < 0) { + // read the entire entry to determine size + size = 0; + byte[] bufferArray = new byte[8192]; + int len; + while ((len = zis.read(bufferArray)) != -1) { + size += len; + } + } + return size; + } + return -1; // file not found + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/dev/zarr/zarrjava/store/S3Store.java b/src/main/java/dev/zarr/zarrjava/store/S3Store.java index ff1ad7d..2b9c73d 100644 --- a/src/main/java/dev/zarr/zarrjava/store/S3Store.java +++ b/src/main/java/dev/zarr/zarrjava/store/S3Store.java @@ -71,7 +71,7 @@ public ByteBuffer get(String[] keys, long start) { GetObjectRequest req = GetObjectRequest.builder() .bucket(bucketName) .key(resolveKeys(keys)) - .range(String.valueOf(start)) + .range(String.format("bytes=%d-", start)) .build(); return get(req); } @@ -82,7 +82,7 @@ public ByteBuffer get(String[] keys, long start, long end) { GetObjectRequest req = GetObjectRequest.builder() .bucket(bucketName) .key(resolveKeys(keys)) - .range(start + "-" + end) + .range(String.format("bytes=%d-%d", start, end - 1)) // S3 range is inclusive .build(); return get(req); } @@ -104,7 +104,7 @@ public void delete(String[] keys) { } @Override - public Stream list(String[] keys) { + public Stream list(String[] keys) { final String fullKey = resolveKeys(keys); ListObjectsRequest req = ListObjectsRequest.builder() .bucket(bucketName).prefix(fullKey) @@ -112,7 +112,7 @@ public Stream list(String[] keys) { ListObjectsResponse res = s3client.listObjects(req); return res.contents() .stream() - .map(p -> p.key().substring(fullKey.length() + 1)); + .map(p -> p.key().substring(fullKey.length() + 1).split("/")); } @Nonnull @@ -121,6 +121,30 @@ public StoreHandle resolve(String... keys) { return new StoreHandle(this, keys); } + @Override + public InputStream getInputStream(String[] keys, long start, long end) { + GetObjectRequest req = GetObjectRequest.builder() + .bucket(bucketName) + .key(resolveKeys(keys)) + .range(String.format("bytes=%d-%d", start, end - 1)) // S3 range is inclusive + .build(); + ResponseInputStream responseInputStream = s3client.getObject(req); + return responseInputStream; + } + + @Override + public long getSize(String[] keys) { + HeadObjectRequest req = HeadObjectRequest.builder() + .bucket(bucketName) + .key(resolveKeys(keys)) + .build(); + try { + return s3client.headObject(req).contentLength(); + } catch (NoSuchKeyException e) { + return -1; + } + } + @Override public String toString() { return "s3://" + bucketName + "/" + prefix; diff --git a/src/main/java/dev/zarr/zarrjava/store/Store.java b/src/main/java/dev/zarr/zarrjava/store/Store.java index 41996a6..3747aed 100644 --- a/src/main/java/dev/zarr/zarrjava/store/Store.java +++ b/src/main/java/dev/zarr/zarrjava/store/Store.java @@ -2,6 +2,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.stream.Stream; @@ -25,8 +26,35 @@ public interface Store { @Nonnull StoreHandle resolve(String... keys); + InputStream getInputStream(String[] keys, long start, long end); + + default InputStream getInputStream(String[] keys) { + return getInputStream(keys, 0, -1); + } + + /** + * Gets the size in bytes of the data stored at the given keys. + * + * @param keys The keys identifying the data. + * @return The size in bytes of the data stored at the given keys. -1 if the keys do not exist. + */ + long getSize(String[] keys); + interface ListableStore extends Store { - Stream list(String[] keys); + /** + * Lists all keys in the store that match the given prefix keys. Keys are represented as arrays of strings, + * where each string is a segment of the key path. + * Keys that are exactly equal to the prefix are not included in the results. + * Keys that do not contain data (i.e. "directories") are included in the results. + * + * @param keys The prefix keys to match. + * @return A stream of key arrays that match the given prefix. Prefixed keys are not included in the results. + */ + Stream list(String[] keys); + + default Stream list() { + return list(new String[]{}); + } } } diff --git a/src/main/java/dev/zarr/zarrjava/store/StoreHandle.java b/src/main/java/dev/zarr/zarrjava/store/StoreHandle.java index d435646..e7bd8eb 100644 --- a/src/main/java/dev/zarr/zarrjava/store/StoreHandle.java +++ b/src/main/java/dev/zarr/zarrjava/store/StoreHandle.java @@ -4,6 +4,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.file.NoSuchFileException; import java.nio.file.Path; @@ -12,9 +13,9 @@ public class StoreHandle { @Nonnull - final Store store; + public final Store store; @Nonnull - final String[] keys; + public final String[] keys; public StoreHandle(@Nonnull Store store, @Nonnull String... keys) { this.store = store; @@ -45,6 +46,14 @@ public ByteBuffer read(long start, long end) { return store.get(keys, start, end); } + public InputStream getInputStream(int start, int end) { + return store.getInputStream(keys, start, end); + } + + public InputStream getInputStream() { + return store.getInputStream(keys); + } + public void set(ByteBuffer bytes) { store.set(keys, bytes); } @@ -57,13 +66,17 @@ public boolean exists() { return store.exists(keys); } - public Stream list() { + public Stream list() { if (!(store instanceof Store.ListableStore)) { throw new UnsupportedOperationException("The underlying store does not support listing."); } return ((Store.ListableStore) store).list(keys); } + public long getSize() { + return store.getSize(keys); + } + @Override public String toString() { return store + "/" + String.join("/", keys); diff --git a/src/main/java/dev/zarr/zarrjava/store/ZipStore.java b/src/main/java/dev/zarr/zarrjava/store/ZipStore.java new file mode 100644 index 0000000..603d8e3 --- /dev/null +++ b/src/main/java/dev/zarr/zarrjava/store/ZipStore.java @@ -0,0 +1,83 @@ +package dev.zarr.zarrjava.store; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; + +public abstract class ZipStore implements Store, Store.ListableStore { + public final StoreHandle underlyingStore; + + public ZipStore(@Nonnull StoreHandle underlyingStore) { + this.underlyingStore = underlyingStore; + } + + // adopted from https://stackoverflow.com/a/9918966 + @Nullable + public static String getZipCommentFromBuffer(byte[] bufArray) throws IOException { + // End of Central Directory (EOCD) record magic number + byte[] EOCD = {0x50, 0x4b, 0x05, 0x06}; + int buffLen = bufArray.length; + // Check the buffer from the end + search: + for (int i = buffLen - EOCD.length - 22; i >= 0; i--) { + for (int k = 0; k < EOCD.length; k++) { + if (bufArray[i + k] != EOCD[k]) { + continue search; + } + } + // End of Central Directory found! + int commentLen = bufArray[i + 20] + bufArray[i + 21] * 256; + int realLen = buffLen - i - 22; + if (commentLen != realLen) { + throw new IOException("ZIP comment size mismatch: " + + "directory says len is " + commentLen + + ", but file ends after " + realLen + " bytes!"); + } + return new String(bufArray, i + 22, commentLen); + } + return null; + } + + public String getArchiveComment() throws IOException { + // Attempt to read from the end of the file to find the EOCD record. + // We try a small chunk first (1KB) which covers most short comments (or no comment), + // then the maximum possible EOCD size (approx 65KB). + long fileSize = underlyingStore.getSize(); + if (fileSize < 22) { + return null; + } + int[] readSizes = {1024, 65535 + 22}; + + for (int size : readSizes) { + ByteBuffer buffer; + + if (fileSize < size) { + buffer = underlyingStore.read(); + } else { + buffer = underlyingStore.read(fileSize - size); + } + + if (buffer == null) { + return null; + } + + byte[] bufArray; + if (buffer.hasArray()) { + bufArray = buffer.array(); + } else { + bufArray = new byte[buffer.remaining()]; + buffer.duplicate().get(bufArray); + } + + String comment = getZipCommentFromBuffer(bufArray); + if (comment != null) { + return comment; + } + if (fileSize < size) { + break; + } + } + return null; + } +} \ No newline at end of file diff --git a/src/main/java/dev/zarr/zarrjava/v2/Group.java b/src/main/java/dev/zarr/zarrjava/v2/Group.java index 29bd477..d3229e4 100644 --- a/src/main/java/dev/zarr/zarrjava/v2/Group.java +++ b/src/main/java/dev/zarr/zarrjava/v2/Group.java @@ -16,7 +16,10 @@ import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Objects; import java.util.function.Function; +import java.util.stream.Stream; import static dev.zarr.zarrjava.v2.Node.makeObjectMapper; import static dev.zarr.zarrjava.v2.Node.makeObjectWriter; @@ -170,7 +173,7 @@ public static Group create(String path, Attributes attributes) throws IOExceptio * @throws IOException if there is an error accessing the storage */ @Nullable - public Node get(String key) throws ZarrException, IOException { + public Node get(String[] key) throws ZarrException, IOException { StoreHandle keyHandle = storeHandle.resolve(key); try { return Node.open(keyHandle); @@ -179,6 +182,26 @@ public Node get(String key) throws ZarrException, IOException { } } + @Override + public Stream list() { + return storeHandle.list().map(key -> { + if (key.length <= 1) return null; // exclude root from list + String fileName = key[key.length - 1]; + StoreHandle parent = storeHandle.resolve(Arrays.copyOf(key, key.length - 1)); + try { + if (fileName.equals(ZARRAY)) { + return Array.open(parent); + } + if (fileName.equals(ZGROUP)) { + return (dev.zarr.zarrjava.core.Node) Group.open(parent); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return null; + }).filter(Objects::nonNull); + } + /** * Creates a new subgroup with default metadata at the specified key. * diff --git a/src/main/java/dev/zarr/zarrjava/v3/Group.java b/src/main/java/dev/zarr/zarrjava/v3/Group.java index 305dcd1..5df6d5b 100644 --- a/src/main/java/dev/zarr/zarrjava/v3/Group.java +++ b/src/main/java/dev/zarr/zarrjava/v3/Group.java @@ -15,7 +15,9 @@ import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Arrays; import java.util.function.Function; +import java.util.stream.Stream; import static dev.zarr.zarrjava.v3.Node.makeObjectMapper; import static dev.zarr.zarrjava.v3.Node.makeObjectWriter; @@ -112,10 +114,7 @@ public static Group create(@Nonnull StoreHandle storeHandle, @Nonnull GroupMetad * @throws IOException if the metadata cannot be serialized * @throws ZarrException if the attributes are invalid */ - public static Group create( - @Nonnull StoreHandle storeHandle, - @Nonnull Attributes attributes - ) throws IOException, ZarrException { + public static Group create(@Nonnull StoreHandle storeHandle, @Nonnull Attributes attributes) throws IOException, ZarrException { return create(storeHandle, new GroupMetadata(attributes)); } @@ -184,7 +183,7 @@ public static Group create(String path) throws IOException, ZarrException { * @throws IOException if there is an error accessing the storage */ @Nullable - public Node get(String key) throws ZarrException, IOException { + public Node get(String[] key) throws ZarrException, IOException { StoreHandle keyHandle = storeHandle.resolve(key); try { return Node.open(keyHandle); @@ -193,6 +192,21 @@ public Node get(String key) throws ZarrException, IOException { } } + @Override + public Stream list() { + Stream metadataKeys = storeHandle.list() + .filter(key -> key[key.length - 1].equals(ZARR_JSON)) + .filter(key -> key.length > 1); // exclude root from list + return metadataKeys.map(key -> { + try { + return get(Arrays.copyOf(key, key.length - 1)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + /** * Creates a new subgroup with the provided metadata at the specified key. * @@ -200,8 +214,7 @@ public Node get(String key) throws ZarrException, IOException { * @param groupMetadata the metadata of the Zarr group * @throws IOException if the metadata cannot be serialized */ - public Group createGroup(String key, GroupMetadata groupMetadata) - throws IOException, ZarrException { + public Group createGroup(String key, GroupMetadata groupMetadata) throws IOException, ZarrException { return Group.create(storeHandle.resolve(key), groupMetadata); } @@ -212,8 +225,7 @@ public Group createGroup(String key, GroupMetadata groupMetadata) * @param attributes attributes of the Zarr group * @throws IOException if the metadata cannot be serialized */ - public Group createGroup(String key, Attributes attributes) - throws IOException, ZarrException { + public Group createGroup(String key, Attributes attributes) throws IOException, ZarrException { return Group.create(storeHandle.resolve(key), new GroupMetadata(attributes)); } @@ -237,8 +249,7 @@ public Group createGroup(String key) throws IOException { * @throws IOException if the metadata cannot be serialized * @throws ZarrException if the array cannot be created */ - public Array createArray(String key, ArrayMetadata arrayMetadata) - throws IOException, ZarrException { + public Array createArray(String key, ArrayMetadata arrayMetadata) throws IOException, ZarrException { return Array.create(storeHandle.resolve(key), arrayMetadata); } @@ -249,9 +260,7 @@ public Array createArray(String key, ArrayMetadata arrayMetadata) * @param arrayMetadataBuilderMapper a function building the metadata of the Zarr array * @throws IOException if the metadata cannot be serialized */ - public Array createArray(String key, - Function arrayMetadataBuilderMapper) - throws IOException, ZarrException { + public Array createArray(String key, Function arrayMetadataBuilderMapper) throws IOException, ZarrException { return Array.create(storeHandle.resolve(key), arrayMetadataBuilderMapper, false); } @@ -262,8 +271,7 @@ private Group writeMetadata() throws IOException { private Group writeMetadata(GroupMetadata newGroupMetadata) throws IOException { ObjectWriter objectWriter = makeObjectWriter(); ByteBuffer metadataBytes = ByteBuffer.wrap(objectWriter.writeValueAsBytes(newGroupMetadata)); - storeHandle.resolve(ZARR_JSON) - .set(metadataBytes); + storeHandle.resolve(ZARR_JSON).set(metadataBytes); this.metadata = newGroupMetadata; return this; } @@ -276,8 +284,7 @@ private Group writeMetadata(GroupMetadata newGroupMetadata) throws IOException { * @throws ZarrException if the new attributes are invalid * @throws IOException if the metadata cannot be serialized */ - public Group updateAttributes(Function attributeMapper) - throws ZarrException, IOException { + public Group updateAttributes(Function attributeMapper) throws ZarrException, IOException { return setAttributes(attributeMapper.apply(metadata.attributes)); } diff --git a/src/test/java/dev/zarr/zarrjava/Utils.java b/src/test/java/dev/zarr/zarrjava/Utils.java new file mode 100644 index 0000000..5356af6 --- /dev/null +++ b/src/test/java/dev/zarr/zarrjava/Utils.java @@ -0,0 +1,84 @@ +package dev.zarr.zarrjava; + +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; +import java.util.zip.ZipOutputStream; + +public class Utils { + + static void zipFile(Path sourceDir, Path targetDir) throws IOException { + FileOutputStream fos = new FileOutputStream(targetDir.toFile()); + ZipOutputStream zipOut = new ZipOutputStream(fos); + + File fileToZip = new File(sourceDir.toUri()); + + zipFile(fileToZip, "", zipOut); + zipOut.close(); + fos.close(); + } + + static void zipFile(File fileToZip, String fileName, ZipOutputStream zipOut) throws IOException { + if (fileToZip.isHidden()) { + return; + } + if (fileToZip.isDirectory()) { + if (fileName.endsWith("/")) { + zipOut.putNextEntry(new ZipEntry(fileName)); + zipOut.closeEntry(); + } else { + zipOut.putNextEntry(new ZipEntry(fileName + "/")); + zipOut.closeEntry(); + } + File[] children = fileToZip.listFiles(); + for (File childFile : children) { + zipFile(childFile, fileName + "/" + childFile.getName(), zipOut); + } + return; + } + FileInputStream fis = new FileInputStream(fileToZip); + ZipEntry zipEntry = new ZipEntry(fileName); + zipOut.putNextEntry(zipEntry); + byte[] bytes = new byte[1024]; + int length; + while ((length = fis.read(bytes)) >= 0) { + zipOut.write(bytes, 0, length); + } + fis.close(); + } + + /** + * Unzip sourceZip into targetDir. + * Protects against Zip Slip by ensuring extracted paths remain inside targetDir. + */ + static void unzipFile(Path sourceZip, Path targetDir) throws IOException { + Files.createDirectories(targetDir); + try (FileInputStream fis = new FileInputStream(sourceZip.toFile()); + ZipInputStream zis = new ZipInputStream(fis)) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { + Path outPath = targetDir.resolve(entry.getName()).normalize(); + Path targetDirNorm = targetDir.normalize(); + if (!outPath.startsWith(targetDirNorm)) { + throw new IOException("Zip entry is outside of the target dir: " + entry.getName()); + } + if (entry.isDirectory() || entry.getName().endsWith("/")) { + Files.createDirectories(outPath); + } else { + Files.createDirectories(outPath.getParent()); + try (BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(outPath.toFile()))) { + byte[] buffer = new byte[1024]; + int len; + while ((len = zis.read(buffer)) > 0) { + bos.write(buffer, 0, len); + } + } + } + zis.closeEntry(); + } + } + } + +} diff --git a/src/test/java/dev/zarr/zarrjava/ZarrStoreTest.java b/src/test/java/dev/zarr/zarrjava/ZarrStoreTest.java index cbb21f8..6e491be 100644 --- a/src/test/java/dev/zarr/zarrjava/ZarrStoreTest.java +++ b/src/test/java/dev/zarr/zarrjava/ZarrStoreTest.java @@ -1,30 +1,151 @@ package dev.zarr.zarrjava; +import com.adobe.testing.s3mock.testcontainers.S3MockContainer; import com.fasterxml.jackson.databind.ObjectMapper; -import dev.zarr.zarrjava.core.Attributes; -import dev.zarr.zarrjava.store.FilesystemStore; -import dev.zarr.zarrjava.store.HttpStore; -import dev.zarr.zarrjava.store.MemoryStore; -import dev.zarr.zarrjava.store.S3Store; -import dev.zarr.zarrjava.v3.*; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; +import dev.zarr.zarrjava.core.*; +import dev.zarr.zarrjava.store.*; +import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; +import org.apache.commons.compress.archivers.zip.ZipFile; +import org.junit.jupiter.api.*; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.MethodSource; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.utils.AttributeMap; import java.io.IOException; +import java.io.InputStream; import java.net.URI; +import java.nio.ByteBuffer; import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.zip.ZipEntry; +import static dev.zarr.zarrjava.Utils.unzipFile; +import static dev.zarr.zarrjava.Utils.zipFile; import static dev.zarr.zarrjava.v3.Node.makeObjectMapper; +import static software.amazon.awssdk.http.SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES; + + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Testcontainers +class S3StoreTest { + private final String TEST_BUCKET = "test-bucket"; + @Container + private S3MockContainer s3Mock; + private S3Client s3Client; + + @BeforeAll + void setUp() { + s3Mock = new S3MockContainer("latest").withInitialBuckets(TEST_BUCKET); + s3Mock.start(); + SdkHttpClient httpClient = ApacheHttpClient.builder().buildWithDefaults(AttributeMap.builder().put(TRUST_ALL_CERTIFICATES, Boolean.TRUE).build()); + s3Client = S3Client.builder().endpointOverride(URI.create(s3Mock.getHttpsEndpoint())).serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()).credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("foo", "bar"))).region(Region.US_EAST_1) // required, but ignored + .httpClient(httpClient).build(); + } + + @AfterAll + void tearDown() { + if (s3Mock.isRunning()) { + s3Mock.stop(); + } + } + + @Test + void testS3Mock() { + Assertions.assertTrue(s3Mock.isRunning()); + } + + @Test + void testReadWriteS3Store() { + S3Store s3Store = new S3Store(s3Client, TEST_BUCKET, ""); + + StoreHandle storeHandle = s3Store.resolve("testfile"); + byte[] testData = new byte[100]; + for (int i = 0; i < testData.length; i++) { + testData[i] = (byte) i; + } + storeHandle.set(ByteBuffer.wrap(testData)); + ByteBuffer retrievedData = storeHandle.read(); + byte[] retrievedBytes = new byte[retrievedData.remaining()]; + retrievedData.get(retrievedBytes); + Assertions.assertArrayEquals(testData, retrievedBytes); + } + +} public class ZarrStoreTest extends ZarrTest { + static StoreHandle createS3StoreHandle() { + S3Store s3Store = new S3Store(S3Client.builder() + .endpointOverride(URI.create("https://uk1s3.embassy.ebi.ac.uk")) + .region(Region.US_EAST_1) // required, but ignored + .serviceConfiguration( + S3Configuration.builder() + .pathStyleAccessEnabled(true) // required + .build() + ) + .credentialsProvider(AnonymousCredentialsProvider.create()) + .build(), "idr", "zarr/v0.5/idr0033A"); + return s3Store.resolve("BR00109990_C2.zarr", "0", "0"); + } + + static Stream inputStreamStores() throws IOException { + StoreHandle s3StoreHandle = createS3StoreHandle().resolve("zarr.json"); + + byte[] testData = new byte[100]; + for (int i = 0; i < testData.length; i++) { + testData[i] = (byte) i; + } + + StoreHandle memoryStoreHandle = new MemoryStore().resolve(); + memoryStoreHandle.set(ByteBuffer.wrap(testData)); + + StoreHandle fsStoreHandle = new FilesystemStore(TESTOUTPUT.resolve("testInputStreamFS")).resolve("testfile"); + fsStoreHandle.set(ByteBuffer.wrap(testData)); + + zipFile(TESTOUTPUT.resolve("testInputStreamFS"), TESTOUTPUT.resolve("testInputStreamZIP.zip")); + StoreHandle bufferedZipStoreHandle = new BufferedZipStore(TESTOUTPUT.resolve("testInputStreamZIP.zip"), true) + .resolve("testfile"); + + StoreHandle readOnlyZipStoreHandle = new ReadOnlyZipStore(TESTOUTPUT.resolve("testInputStreamZIP.zip")) + .resolve("testfile"); + + StoreHandle httpStoreHandle = new HttpStore("https://static.webknossos.org/data/zarr_v3/l4_sample") + .resolve("color", "1", "zarr.json"); + return Stream.of( + memoryStoreHandle, + s3StoreHandle, + fsStoreHandle, + bufferedZipStoreHandle, + readOnlyZipStoreHandle, + httpStoreHandle + ); + } + + static Stream localStores() { + return Stream.of( + new MemoryStore(), + new FilesystemStore(TESTOUTPUT.resolve("testLocalStoresFS")), + new BufferedZipStore(TESTOUTPUT.resolve("testLocalStoresZIP.zip"), true), + new ReadOnlyZipStore(TESTOUTPUT.resolve("testLocalStoresReadOnlyZIP.zip")) + ); + } + @Test public void testFileSystemStores() throws IOException, ZarrException { FilesystemStore fsStore = new FilesystemStore(TESTDATA); @@ -32,7 +153,7 @@ public void testFileSystemStores() throws IOException, ZarrException { GroupMetadata groupMetadata = objectMapper.readValue( Files.readAllBytes(TESTDATA.resolve("l4_sample").resolve("zarr.json")), - GroupMetadata.class + dev.zarr.zarrjava.v3.GroupMetadata.class ); String groupMetadataString = objectMapper.writeValueAsString(groupMetadata); @@ -41,7 +162,7 @@ public void testFileSystemStores() throws IOException, ZarrException { ArrayMetadata arrayMetadata = objectMapper.readValue(Files.readAllBytes(TESTDATA.resolve( "l4_sample").resolve("color").resolve("1").resolve("zarr.json")), - ArrayMetadata.class); + dev.zarr.zarrjava.v3.ArrayMetadata.class); String arrayMetadataString = objectMapper.writeValueAsString(arrayMetadata); Assertions.assertTrue(arrayMetadataString.contains("\"zarr_format\":3")); @@ -51,8 +172,7 @@ public void testFileSystemStores() throws IOException, ZarrException { Assertions.assertInstanceOf(Array.class, Array.open(fsStore.resolve("l4_sample", "color", "1"))); Node[] subNodes = Group.open(fsStore.resolve("l4_sample")).list().toArray(Node[]::new); - Assertions.assertEquals(2, subNodes.length); - Assertions.assertInstanceOf(Group.class, subNodes[0]); + Assertions.assertEquals(12, subNodes.length); Array[] colorSubNodes = ((Group) Group.open(fsStore.resolve("l4_sample")).get("color")).list().toArray(Array[]::new); @@ -65,26 +185,49 @@ public void testFileSystemStores() throws IOException, ZarrException { @Test public void testS3Store() throws IOException, ZarrException { - S3Store s3Store = new S3Store(S3Client.builder() - .endpointOverride(URI.create("https://uk1s3.embassy.ebi.ac.uk")) - .region(Region.US_EAST_1) // required, but ignored - .serviceConfiguration( - S3Configuration.builder() - .pathStyleAccessEnabled(true) // required - .build() - ) - .credentialsProvider(AnonymousCredentialsProvider.create()) - .build(), "idr", "zarr/v0.5/idr0033A"); - - Array arrayV3 = Array.open(s3Store.resolve("BR00109990_C2.zarr", "0", "0")); + StoreHandle s3StoreHandle = createS3StoreHandle(); + Array arrayV3 = Array.open(s3StoreHandle); Assertions.assertArrayEquals(new long[]{5, 1552, 2080}, arrayV3.metadata().shape); Assertions.assertEquals(574, arrayV3.read(new long[]{0, 0, 0}, new int[]{1, 1, 1}).getInt(0)); - dev.zarr.zarrjava.core.Array arrayCore = dev.zarr.zarrjava.core.Array.open(s3Store.resolve("BR00109990_C2.zarr", "0", "0")); + dev.zarr.zarrjava.core.Array arrayCore = dev.zarr.zarrjava.core.Array.open(s3StoreHandle); Assertions.assertArrayEquals(new long[]{5, 1552, 2080}, arrayCore.metadata().shape); Assertions.assertEquals(574, arrayCore.read(new long[]{0, 0, 0}, new int[]{1, 1, 1}).getInt(0)); } + @Test + public void testS3StoreGet() throws ZarrException { + StoreHandle s3StoreHandle = createS3StoreHandle().resolve("zarr.json"); + S3Store s3Store = (S3Store) s3StoreHandle.store; + ByteBuffer buffer = s3Store.get(s3StoreHandle.keys); + ByteBuffer bufferWithStart = s3Store.get(s3StoreHandle.keys, 10); + Assertions.assertEquals(10, buffer.remaining() - bufferWithStart.remaining()); + + ByteBuffer bufferWithStartAndEnd = s3Store.get(s3StoreHandle.keys, 0, 10); + Assertions.assertEquals(10, bufferWithStartAndEnd.remaining()); + + } + + @ParameterizedTest + @MethodSource("inputStreamStores") + public void testStoreInputStream(StoreHandle storeHandle) throws IOException { + InputStream is = storeHandle.getInputStream(10, 20); + byte[] buffer = new byte[10]; + int bytesRead = is.read(buffer); + Assertions.assertEquals(10, bytesRead); + byte[] expectedBuffer = new byte[10]; + storeHandle.read(10, 20).get(expectedBuffer); + Assertions.assertArrayEquals(expectedBuffer, buffer); + } + + @ParameterizedTest + @MethodSource("inputStreamStores") + public void testStoreGetSize(StoreHandle storeHandle) { + long size = storeHandle.getSize(); + long actual_size = storeHandle.read().remaining(); + Assertions.assertEquals(actual_size, size); + } + @Test public void testHttpStore() throws IOException, ZarrException { HttpStore httpStore = new dev.zarr.zarrjava.store.HttpStore("https://uk1s3.embassy.ebi.ac.uk/idr/zarr/v0.5/idr0033A"); @@ -96,13 +239,12 @@ public void testHttpStore() throws IOException, ZarrException { @ParameterizedTest @CsvSource({"false", "true",}) public void testMemoryStoreV3(boolean useParallel) throws ZarrException, IOException { - int[] testData = new int[1024 * 1024]; - Arrays.setAll(testData, p -> p); + int[] testData = testData(); - Group group = Group.create(new MemoryStore().resolve()); + dev.zarr.zarrjava.v3.Group group = dev.zarr.zarrjava.v3.Group.create(new MemoryStore().resolve()); Array array = group.createArray("array", b -> b .withShape(1024, 1024) - .withDataType(DataType.UINT32) + .withDataType(dev.zarr.zarrjava.v3.DataType.UINT32) .withChunkShape(5, 5) ); array.write(ucar.ma2.Array.factory(ucar.ma2.DataType.UINT, new int[]{1024, 1024}, testData), useParallel); @@ -121,14 +263,13 @@ public void testMemoryStoreV3(boolean useParallel) throws ZarrException, IOExcep @ParameterizedTest @CsvSource({"false", "true",}) public void testMemoryStoreV2(boolean useParallel) throws ZarrException, IOException { - int[] testData = new int[1024 * 1024]; - Arrays.setAll(testData, p -> p); + int[] testData = testData(); dev.zarr.zarrjava.v2.Group group = dev.zarr.zarrjava.v2.Group.create(new MemoryStore().resolve()); dev.zarr.zarrjava.v2.Array array = group.createArray("array", b -> b .withShape(1024, 1024) .withDataType(dev.zarr.zarrjava.v2.DataType.UINT32) - .withChunks(5, 5) + .withChunks(512, 512) ); array.write(ucar.ma2.Array.factory(ucar.ma2.DataType.UINT, new int[]{1024, 1024}, testData), useParallel); group.createGroup("subgroup"); @@ -143,4 +284,248 @@ public void testMemoryStoreV2(boolean useParallel) throws ZarrException, IOExcep Assertions.assertEquals("test group", attrs.getString("description")); } + + @Test + public void testOpenZipStore() throws ZarrException, IOException { + Path sourceDir = TESTOUTPUT.resolve("testZipStore"); + Path targetDir = TESTOUTPUT.resolve("testZipStore.zip"); + FilesystemStore fsStore = new FilesystemStore(sourceDir); + writeTestGroupV3(fsStore, true); + + zipFile(sourceDir, targetDir); + + BufferedZipStore zipStore = new BufferedZipStore(targetDir); + assertIsTestGroupV3(Group.open(zipStore.resolve()), true); + + ReadOnlyZipStore readOnlyZipStore = new ReadOnlyZipStore(targetDir); + assertIsTestGroupV3(Group.open(readOnlyZipStore.resolve()), true); + } + + @ParameterizedTest + @CsvSource({"false", "true",}) + public void testWriteZipStore(boolean flushOnWrite) throws ZarrException, IOException { + Path path = TESTOUTPUT.resolve("testWriteZipStore" + (flushOnWrite ? "Flush" : "NoFlush") + ".zip"); + BufferedZipStore zipStore = new BufferedZipStore(path, flushOnWrite); + writeTestGroupV3(zipStore, true); + if (!flushOnWrite) zipStore.flush(); + + BufferedZipStore zipStoreRead = new BufferedZipStore(path); + assertIsTestGroupV3(Group.open(zipStoreRead.resolve()), true); + + Path unzippedPath = TESTOUTPUT.resolve("testWriteZipStoreUnzipped" + (flushOnWrite ? "Flush" : "NoFlush")); + + unzipFile(path, unzippedPath); + FilesystemStore fsStore = new FilesystemStore(unzippedPath); + assertIsTestGroupV3(Group.open(fsStore.resolve()), true); + } + + @ParameterizedTest + @CsvSource({"false", "true",}) + public void testZipStoreWithComment(boolean flushOnWrite) throws ZarrException, IOException { + Path path = TESTOUTPUT.resolve("testZipStoreWithComment" + (flushOnWrite ? "Flush" : "NoFlush") + ".zip"); + String comment = "{\"ome\": { \"version\": \"XX.YY\" }}"; + BufferedZipStore zipStore = new BufferedZipStore(path, comment, flushOnWrite); + writeTestGroupV3(zipStore, true); + if (!flushOnWrite) zipStore.flush(); + + try (java.util.zip.ZipFile zipFile = new java.util.zip.ZipFile(path.toFile())) { + String retrievedComment = zipFile.getComment(); + Assertions.assertEquals(comment, retrievedComment, "ZIP archive comment does not match expected value."); + } + + Assertions.assertEquals(comment, new BufferedZipStore(path).getArchiveComment(), "ZIP archive comment from store does not match expected value."); + } + + /** + * Test that ZipStore meets requirements for underlying store of Zipped OME-Zarr + * + * @see RFC-9: Zipped OME-Zarr + */ + @Test + public void testZipStoreRequirements() throws ZarrException, IOException { + Path path = TESTOUTPUT.resolve("testZipStoreRequirements.zip"); + BufferedZipStore zipStore = new BufferedZipStore(path); + + dev.zarr.zarrjava.v3.Group group = dev.zarr.zarrjava.v3.Group.create(zipStore.resolve()); + Array array = group.createArray("a1", b -> b + .withShape(1024, 1024) + .withDataType(dev.zarr.zarrjava.v3.DataType.UINT32) + .withChunkShape(512, 512) + ); + array.write(ucar.ma2.Array.factory(ucar.ma2.DataType.UINT, new int[]{1024, 1024}, testData()), true); + + dev.zarr.zarrjava.v3.Group g1 = group.createGroup("g1"); + g1.createGroup("g1_1").createGroup("g1_1_1"); + g1.createGroup("g1_2"); + group.createGroup("g2").createGroup("g2_1"); + group.createGroup("g3"); + + zipStore.flush(); + + try (ZipFile zip = new ZipFile(path.toFile())) { + ArrayList entries = Collections.list(zip.getEntries()); + + // no compression + for (ZipArchiveEntry e : entries) { + Assertions.assertEquals(ZipEntry.STORED, e.getMethod(), "Entry " + e.getName() + " is compressed"); + } + + // correct order of zarr.json files + String[] expectedFirstEntries = new String[]{ + "zarr.json", + "a1/zarr.json", + "g1/zarr.json", + "g2/zarr.json", + "g3/zarr.json", + "g1/g1_1/zarr.json", + "g1/g1_2/zarr.json", + "g2/g2_1/zarr.json", + "g1/g1_1/g1_1_1/zarr.json" + }; + String[] actualFirstEntries = entries.stream() + .map(ZipArchiveEntry::getName) + .limit(expectedFirstEntries.length) + .toArray(String[]::new); + + Assertions.assertArrayEquals(expectedFirstEntries, actualFirstEntries, "zarr.json files are not in the expected breadth-first order"); + } + } + + @ParameterizedTest + @CsvSource({"false", "true",}) + public void testZipStoreV2(boolean flushOnWrite) throws ZarrException, IOException { + Path path = TESTOUTPUT.resolve("testZipStoreV2" + (flushOnWrite ? "Flush" : "NoFlush") + ".zip"); + BufferedZipStore zipStore = new BufferedZipStore(path, flushOnWrite); + writeTestGroupV2(zipStore, true); + if (!flushOnWrite) zipStore.flush(); + + BufferedZipStore zipStoreRead = new BufferedZipStore(path); + assertIsTestGroupV2(Group.open(zipStoreRead.resolve()), true); + + Path unzippedPath = TESTOUTPUT.resolve("testZipStoreV2Unzipped"); + + unzipFile(path, unzippedPath); + FilesystemStore fsStore = new FilesystemStore(unzippedPath); + assertIsTestGroupV2(Group.open(fsStore.resolve()), true); + } + + @Test + public void testReadOnlyZipStore() throws ZarrException, IOException { + Path path = TESTOUTPUT.resolve("testReadOnlyZipStore.zip"); + String archiveComment = "This is a test ZIP archive comment."; + BufferedZipStore zipStore = new BufferedZipStore(path, archiveComment); + writeTestGroupV3(zipStore, true); + zipStore.flush(); + + ReadOnlyZipStore readOnlyZipStore = new ReadOnlyZipStore(path); + Assertions.assertEquals(archiveComment, readOnlyZipStore.getArchiveComment(), "ZIP archive comment from ReadOnlyZipStore does not match expected value."); + assertIsTestGroupV3(Group.open(readOnlyZipStore.resolve()), true); + } + + @ParameterizedTest + @MethodSource("localStores") + public void testLocalStores(Store.ListableStore store) throws IOException, ZarrException { + boolean useParallel = true; + Store writeStore = store; + if (store instanceof ReadOnlyZipStore) { + StoreHandle underlyingStore = ((ReadOnlyZipStore)store).underlyingStore; + writeStore = new BufferedZipStore(underlyingStore, true); + } + Group group = writeTestGroupV3(writeStore, useParallel); + + java.util.Set expectedSubgroupKeys = new java.util.HashSet<>(Arrays.asList( + "array/c/1/1", + "array/c/0/0", + "array/c/0/1", + "zarr.json", + "array", + "array/c/1/0", + "array/c/1", + "array/c/0", + "array/zarr.json", + "array/c" + )); + + java.util.Set actualKeys = store.resolve("subgroup").list() + .map(node -> String.join("/", node)) + .collect(Collectors.toSet()); + + Assertions.assertEquals(expectedSubgroupKeys, actualKeys); + + assertIsTestGroupV3(group, useParallel); + } + + + int[] testData() { + int[] testData = new int[1024 * 1024]; + Arrays.setAll(testData, p -> p); + return testData; + } + + Group writeTestGroupV3(Store store, boolean useParallel) throws ZarrException, IOException { + StoreHandle storeHandle = store.resolve(); + + dev.zarr.zarrjava.v3.Group group = dev.zarr.zarrjava.v3.Group.create(storeHandle); + dev.zarr.zarrjava.v3.Array array = group.createArray("array", b -> b + .withShape(1024, 1024) + .withDataType(dev.zarr.zarrjava.v3.DataType.UINT32) + .withChunkShape(512, 512) + ); + array.write(ucar.ma2.Array.factory(ucar.ma2.DataType.UINT, new int[]{1024, 1024}, testData()), useParallel); + dev.zarr.zarrjava.v3.Group subgroup = group.createGroup("subgroup"); + dev.zarr.zarrjava.v3.Array subgrouparray = subgroup.createArray("array", b -> b + .withShape(1024, 1024) + .withDataType(dev.zarr.zarrjava.v3.DataType.UINT32) + .withChunkShape(512, 512) + ); + subgrouparray.write(ucar.ma2.Array.factory(ucar.ma2.DataType.UINT, new int[]{1024, 1024}, testData()), useParallel); + + group.setAttributes(new Attributes(b -> b.set("some", "value"))); + return group; + } + + void assertIsTestGroupV3(Group group, boolean useParallel) throws ZarrException, IOException { + Stream nodes = group.list(); + List nodeList = nodes.collect(Collectors.toList()); + Assertions.assertEquals(3, nodeList.size()); + Array array = (Array) group.get("array"); + Assertions.assertNotNull(array); + ucar.ma2.Array result = array.read(useParallel); + Assertions.assertArrayEquals(testData(), (int[]) result.get1DJavaArray(ucar.ma2.DataType.UINT)); + Group subgroup = (Group) group.get("subgroup"); + Array subgrouparray = (Array) subgroup.get("array"); + result = subgrouparray.read(useParallel); + Assertions.assertArrayEquals(testData(), (int[]) result.get1DJavaArray(ucar.ma2.DataType.UINT)); + Attributes attrs = group.metadata().attributes(); + Assertions.assertNotNull(attrs); + Assertions.assertEquals("value", attrs.getString("some")); + } + + + dev.zarr.zarrjava.v2.Group writeTestGroupV2(Store store, boolean useParallel) throws ZarrException, IOException { + StoreHandle storeHandle = store.resolve(); + + dev.zarr.zarrjava.v2.Group group = dev.zarr.zarrjava.v2.Group.create(storeHandle); + dev.zarr.zarrjava.v2.Array array = group.createArray("array", b -> b + .withShape(1024, 1024) + .withDataType(dev.zarr.zarrjava.v2.DataType.UINT32) + .withChunks(512, 512) + ); + array.write(ucar.ma2.Array.factory(ucar.ma2.DataType.UINT, new int[]{1024, 1024}, testData()), useParallel); + group.createGroup("subgroup"); + group.setAttributes(new Attributes().set("some", "value")); + return group; + } + + void assertIsTestGroupV2(Group group, boolean useParallel) throws ZarrException, IOException { + Stream nodes = group.list(); + Assertions.assertEquals(2, nodes.count()); + Array array = (Array) group.get("array"); + Assertions.assertNotNull(array); + ucar.ma2.Array result = array.read(useParallel); + Assertions.assertArrayEquals(testData(), (int[]) result.get1DJavaArray(ucar.ma2.DataType.UINT)); + Attributes attrs = group.metadata().attributes(); + Assertions.assertNotNull(attrs); + Assertions.assertEquals("value", attrs.getString("some")); + } } diff --git a/src/test/java/dev/zarr/zarrjava/ZarrV2Test.java b/src/test/java/dev/zarr/zarrjava/ZarrV2Test.java index 084d831..534a351 100644 --- a/src/test/java/dev/zarr/zarrjava/ZarrV2Test.java +++ b/src/test/java/dev/zarr/zarrjava/ZarrV2Test.java @@ -248,8 +248,9 @@ public void testGroup() throws IOException, ZarrException { .withChunks(5, 5) ); array.write(new long[]{2, 2}, ucar.ma2.Array.factory(ucar.ma2.DataType.UBYTE, new int[]{8, 8})); - - Assertions.assertArrayEquals(new int[]{5, 5}, ((Array) ((Group) group.listAsArray()[0]).listAsArray()[0]).metadata().chunks); + Array[] arrays = group.list().filter(n -> n instanceof Array).toArray(Array[]::new); + Assertions.assertEquals(1, arrays.length); + Assertions.assertArrayEquals(new int[]{5, 5}, arrays[0].metadata().chunks); } @Test @@ -403,8 +404,6 @@ public void testMemoryStore() throws ZarrException, IOException { ); group.createGroup("subgroup"); Assertions.assertEquals(2, group.list().count()); - for (String s : storeHandle.list().toArray(String[]::new)) - System.out.println(s); } } \ No newline at end of file