Skip to content

Commit bcdcf46

Browse files
committed
[FLINK-38592] Native s3 FileSystem
1 parent 46b54bc commit bcdcf46

26 files changed

+4072
-0
lines changed
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
# Native S3 FileSystem
2+
3+
This module provides a native S3 filesystem implementation for Apache Flink using AWS SDK v2.
4+
5+
## Overview
6+
7+
The Native S3 FileSystem is a direct implementation of Flink's FileSystem interface using AWS SDK v2, without Hadoop dependencies. It provides exactly-once semantics for checkpointing and file sinks through S3 multipart uploads.
8+
9+
## Usage
10+
11+
Add this module to Flink's plugins directory:
12+
13+
```bash
14+
mkdir -p $FLINK_HOME/plugins/s3-fs-native
15+
cp flink-s3-fs-native-*.jar $FLINK_HOME/plugins/s3-fs-native/
16+
```
17+
18+
Configure S3 credentials in `conf/config.yaml`:
19+
20+
```yaml
21+
s3.access-key: YOUR_ACCESS_KEY
22+
s3.secret-key: YOUR_SECRET_KEY
23+
s3.endpoint: https://s3.amazonaws.com # Optional, defaults to AWS
24+
```
25+
26+
Use S3 paths in your Flink application:
27+
28+
```java
29+
env.getCheckpointConfig().setCheckpointStorage("s3://my-bucket/checkpoints");
30+
31+
DataStream<String> input = env.readTextFile("s3://my-bucket/input");
32+
input.sinkTo(FileSink.forRowFormat(new Path("s3://my-bucket/output"),
33+
new SimpleStringEncoder<>()).build());
34+
```
35+
36+
## Configuration Options
37+
38+
| Key | Default | Description |
39+
|-----|---------|-------------|
40+
| s3.access-key | (none) | AWS access key |
41+
| s3.secret-key | (none) | AWS secret key |
42+
| s3.region | us-east-1 | AWS region |
43+
| s3.endpoint | (none) | Custom S3 endpoint (for MinIO, LocalStack, etc.) |
44+
| s3.path-style-access | false | Use path-style access (auto-enabled for custom endpoints) |
45+
| s3.upload.min.part.size | 5242880 | Minimum part size for multipart uploads (5MB) |
46+
| s3.upload.max.concurrent.uploads | CPU cores | Maximum concurrent uploads per stream |
47+
| s3.entropy.key | (none) | Key for entropy injection in paths |
48+
| s3.entropy.length | 4 | Length of entropy string |
49+
| s3.bulk-copy.enabled | true | Enable bulk copy operations |
50+
| s3.async.enabled | true | Enable async read/write with TransferManager |
51+
| s3.read.buffer.size | 262144 (256KB) | Read buffer size per stream (64KB - 4MB) |
52+
53+
## MinIO and S3-Compatible Storage
54+
55+
The filesystem auto-detects custom endpoints and configures appropriate settings:
56+
57+
```yaml
58+
s3.access-key: minioadmin
59+
s3.secret-key: minioadmin
60+
s3.endpoint: http://localhost:9000
61+
s3.path-style-access: true # Auto-enabled for custom endpoints
62+
```
63+
64+
MinIO-specific optimizations are applied automatically:
65+
- Path-style access enabled
66+
- Chunked encoding disabled (compatibility)
67+
- Checksum validation disabled (compatibility)
68+
69+
## Memory Optimization for Large Files
70+
71+
The filesystem is optimized to handle large files without OOM errors:
72+
73+
### Streaming Reads (No Buffering)
74+
- Files are **streamed** chunk-by-chunk, not loaded into memory
75+
- Configurable read buffer (default 256KB) prevents memory spikes
76+
- Only small buffer held in memory at any time
77+
78+
### Configuration for Memory Efficiency
79+
80+
```yaml
81+
# Read buffer: smaller = less memory, larger = better throughput
82+
s3.read.buffer.size: 262144 # 256KB (default)
83+
# For memory-constrained environments: 65536 (64KB)
84+
# For high-throughput: 1048576 (1MB)
85+
```
86+
87+
**Memory Calculation Per Parallel Read:**
88+
- Buffer size × concurrent reads = total memory
89+
- Example: 256KB buffer × 16 parallel readers = 4MB total
90+
- This allows processing GB-sized files with MB-sized memory
91+
92+
### OOM Prevention Strategies
93+
94+
1. **Use smaller read buffers** (64-128KB) for very large files
95+
2. **Reduce parallelism** to limit concurrent S3 readers
96+
3. **Enable managed memory** for Flink state backend
97+
4. **Monitor**: `s3.read.buffer.size` × `parallelism` = peak memory
98+
99+
## Async Operations with TransferManager
100+
101+
The filesystem uses AWS SDK's TransferManager for high-performance async read/write operations:
102+
103+
**Benefits:**
104+
- **Automatic multipart management**: TransferManager automatically handles multipart uploads for large files
105+
- **Parallel transfers**: Multiple parts uploaded concurrently for maximum throughput
106+
- **Progress tracking**: Built-in progress monitoring and retry logic
107+
- **Resource optimization**: Efficient connection pooling and memory management
108+
- **Streaming uploads**: Data streamed from disk, not buffered in memory
109+
110+
**Configuration:**
111+
```yaml
112+
s3.async.enabled: true # Default: enabled
113+
```
114+
115+
When enabled, file uploads automatically use TransferManager for:
116+
- Large file uploads (automatic multipart handling)
117+
- Checkpoint data writes
118+
- Recoverable output stream operations
119+
120+
**Performance Impact:**
121+
- Up to 10x faster uploads for large files (>100MB)
122+
- **Reduced memory pressure** through streaming
123+
- Better utilization of available bandwidth
124+
- Lower heap requirements for write operations
125+
126+
## Checkpointing
127+
128+
Configure checkpoint storage in `conf/config.yaml`:
129+
130+
```yaml
131+
state.checkpoints.dir: s3://my-bucket/checkpoints
132+
execution.checkpointing.interval: 10s
133+
execution.checkpointing.mode: EXACTLY_ONCE
134+
```
135+
136+
Or programmatically:
137+
138+
```java
139+
env.enableCheckpointing(10000);
140+
env.getCheckpointConfig().setCheckpointStorage("s3://my-bucket/checkpoints");
141+
```
142+
143+
## Entropy Injection
144+
145+
For high-throughput checkpointing to avoid S3 hot partitions:
146+
147+
```yaml
148+
s3.entropy.key: _entropy_
149+
s3.entropy.length: 4
150+
```
151+
152+
Paths like `s3://bucket/_entropy_/checkpoints` will be expanded to `s3://bucket/af7e/checkpoints` with random characters.
153+
154+
## Implementation Details
155+
156+
The filesystem uses:
157+
- **AWS SDK v2** for all S3 operations
158+
- **Multipart uploads** for recoverable writes and large files
159+
- **S3 Transfer Manager** for bulk copy operations
160+
- **Separate sync and async clients** for optimal performance
161+
162+
Key classes:
163+
- `NativeS3FileSystem` - Main FileSystem implementation
164+
- `NativeS3RecoverableWriter` - Exactly-once writer using multipart uploads
165+
- `S3ClientProvider` - Manages S3 client lifecycle
166+
- `NativeS3AccessHelper` - Low-level S3 operations
167+
168+
## Building
169+
170+
```bash
171+
mvn clean package
172+
```
173+
174+
## Testing with MinIO
175+
176+
```bash
177+
# Start MinIO
178+
docker run -d -p 9000:9000 -p 9001:9001 \
179+
-e "MINIO_ROOT_USER=minioadmin" \
180+
-e "MINIO_ROOT_PASSWORD=minioadmin" \
181+
minio/minio server /data --console-address ":9001"
182+
183+
# Create bucket
184+
mc alias set local http://localhost:9000 minioadmin minioadmin
185+
mc mb local/test-bucket
186+
187+
# Run Flink with MinIO
188+
export FLINK_HOME=/path/to/flink
189+
cat > $FLINK_HOME/conf/config.yaml <<EOF
190+
s3.endpoint: http://localhost:9000
191+
s3.access-key: minioadmin
192+
s3.secret-key: minioadmin
193+
s3.path-style-access: true
194+
EOF
195+
196+
$FLINK_HOME/bin/flink run YourJob.jar
197+
```
198+
199+
## Delegation Tokens
200+
201+
The filesystem supports delegation tokens for secure multi-tenant deployments. The delegation token service name is `s3-native` to avoid conflicts with other S3 filesystem implementations.
202+
203+
Configure delegation tokens:
204+
205+
```yaml
206+
security.delegation.token.provider.s3-native.enabled: true
207+
security.delegation.token.provider.s3-native.access-key: YOUR_KEY
208+
security.delegation.token.provider.s3-native.secret-key: YOUR_SECRET
209+
security.delegation.token.provider.s3-native.region: us-east-1
210+
```

0 commit comments

Comments
 (0)