22// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
33// See the LICENSE file in the project root for more information
44
5+ using System . Diagnostics ;
6+ using System . Diagnostics . Metrics ;
57using Amazon . S3 ;
68using Amazon . S3 . Model ;
79using Amazon . S3 . Transfer ;
810using Elastic . Documentation . Diagnostics ;
11+ using Elastic . Documentation . ServiceDefaults . Telemetry ;
912using Microsoft . Extensions . Logging ;
1013
1114namespace Elastic . Documentation . Assembler . Deploying . Synchronization ;
1215
13- public class AwsS3SyncApplyStrategy (
16+ public partial class AwsS3SyncApplyStrategy (
1417 ILoggerFactory logFactory ,
1518 IAmazonS3 s3Client ,
1619 ITransferUtility transferUtility ,
@@ -19,27 +22,160 @@ public class AwsS3SyncApplyStrategy(
1922 IDiagnosticsCollector collector
2023) : IDocsSyncApplyStrategy
2124{
25+ private static readonly ActivitySource ApplyStrategyActivitySource = new ( TelemetryConstants . AssemblerSyncInstrumentationName ) ;
26+
27+ // Meter for OpenTelemetry metrics
28+ private static readonly Meter SyncMeter = new ( TelemetryConstants . AssemblerSyncInstrumentationName ) ;
29+
30+ // Deployment-level metrics (low cardinality)
31+ private static readonly Histogram < long > FilesPerDeploymentHistogram = SyncMeter . CreateHistogram < long > (
32+ "docs.deployment.files.count" ,
33+ "files" ,
34+ "Number of files synced per deployment operation" ) ;
35+
36+ private static readonly Counter < long > FilesAddedCounter = SyncMeter . CreateCounter < long > (
37+ "docs.sync.files.added.total" ,
38+ "files" ,
39+ "Total number of files added to S3" ) ;
40+
41+ private static readonly Counter < long > FilesUpdatedCounter = SyncMeter . CreateCounter < long > (
42+ "docs.sync.files.updated.total" ,
43+ "files" ,
44+ "Total number of files updated in S3" ) ;
45+
46+ private static readonly Counter < long > FilesDeletedCounter = SyncMeter . CreateCounter < long > (
47+ "docs.sync.files.deleted.total" ,
48+ "files" ,
49+ "Total number of files deleted from S3" ) ;
50+
51+ private static readonly Histogram < long > FileSizeHistogram = SyncMeter . CreateHistogram < long > (
52+ "docs.sync.file.size" ,
53+ "By" ,
54+ "Distribution of file sizes synced to S3" ) ;
55+
56+ private static readonly Counter < long > FilesByExtensionCounter = SyncMeter . CreateCounter < long > (
57+ "docs.sync.files.by_extension" ,
58+ "files" ,
59+ "File operations grouped by extension" ) ;
60+
61+ private static readonly Histogram < double > SyncDurationHistogram = SyncMeter . CreateHistogram < double > (
62+ "docs.sync.duration" ,
63+ "s" ,
64+ "Duration of sync operations" ) ;
65+
2266 private readonly ILogger < AwsS3SyncApplyStrategy > _logger = logFactory . CreateLogger < AwsS3SyncApplyStrategy > ( ) ;
2367
24- private void DisplayProgress ( object ? sender , UploadDirectoryProgressArgs args ) => LogProgress ( _logger , args , null ) ;
68+ private void DisplayProgress ( object ? sender , UploadDirectoryProgressArgs args ) => LogProgress ( _logger , args ) ;
2569
26- private static readonly Action < ILogger , UploadDirectoryProgressArgs , Exception ? > LogProgress = LoggerMessage . Define < UploadDirectoryProgressArgs > (
27- LogLevel . Information ,
28- new EventId ( 2 , nameof ( LogProgress ) ) ,
29- "{Args}" ) ;
70+ [ LoggerMessage (
71+ EventId = 2 ,
72+ Level = LogLevel . Debug ,
73+ Message = "{Args}" ) ]
74+ private static partial void LogProgress ( ILogger logger , UploadDirectoryProgressArgs args ) ;
75+
76+ [ LoggerMessage (
77+ EventId = 3 ,
78+ Level = LogLevel . Information ,
79+ Message = "File operation: {Operation} | Path: {FilePath} | Size: {FileSize} bytes" ) ]
80+ private static partial void LogFileOperation ( ILogger logger , string operation , string filePath , long fileSize ) ;
3081
3182 public async Task Apply ( SyncPlan plan , Cancel ctx = default )
3283 {
84+ var sw = Stopwatch . StartNew ( ) ;
85+
86+ using var applyActivity = ApplyStrategyActivitySource . StartActivity ( "sync apply" , ActivityKind . Client ) ;
87+ if ( Environment . GetEnvironmentVariable ( "GITHUB_ACTIONS" ) == "true" )
88+ {
89+ _ = applyActivity ? . SetTag ( "cicd.pipeline.name" , Environment . GetEnvironmentVariable ( "GITHUB_WORKFLOW" ) ?? "unknown" ) ;
90+ _ = applyActivity ? . SetTag ( "cicd.pipeline.run.id" , Environment . GetEnvironmentVariable ( "GITHUB_RUN_ID" ) ?? "unknown" ) ;
91+ _ = applyActivity ? . SetTag ( "cicd.pipeline.run.attempt" , Environment . GetEnvironmentVariable ( "GITHUB_RUN_ATTEMPT" ) ?? "unknown" ) ;
92+ }
93+
94+ var addCount = plan . AddRequests . Count ;
95+ var updateCount = plan . UpdateRequests . Count ;
96+ var deleteCount = plan . DeleteRequests . Count ;
97+ var totalFiles = addCount + updateCount + deleteCount ;
98+
99+ // Add aggregate metrics to span
100+ _ = applyActivity ? . SetTag ( "docs.sync.files.added" , addCount ) ;
101+ _ = applyActivity ? . SetTag ( "docs.sync.files.updated" , updateCount ) ;
102+ _ = applyActivity ? . SetTag ( "docs.sync.files.deleted" , deleteCount ) ;
103+ _ = applyActivity ? . SetTag ( "docs.sync.files.total" , totalFiles ) ;
104+
105+ // Record deployment-level metrics
106+ FilesPerDeploymentHistogram . Record ( totalFiles ) ;
107+
108+ if ( addCount > 0 )
109+ {
110+ FilesPerDeploymentHistogram . Record ( addCount ,
111+ [ new ( "operation" , "add" ) ] ) ;
112+ }
113+
114+ if ( updateCount > 0 )
115+ {
116+ FilesPerDeploymentHistogram . Record ( updateCount ,
117+ [ new ( "operation" , "update" ) ] ) ;
118+ }
119+
120+ if ( deleteCount > 0 )
121+ {
122+ FilesPerDeploymentHistogram . Record ( deleteCount ,
123+ [ new ( "operation" , "delete" ) ] ) ;
124+ }
125+
126+ _logger . LogInformation (
127+ "Deployment sync: {TotalFiles} files ({AddCount} added, {UpdateCount} updated, {DeleteCount} deleted) in {Environment}" ,
128+ totalFiles , addCount , updateCount , deleteCount , context . Environment . Name ) ;
129+
33130 await Upload ( plan , ctx ) ;
34131 await Delete ( plan , ctx ) ;
132+
133+ // Record sync duration
134+ SyncDurationHistogram . Record ( sw . Elapsed . TotalSeconds ,
135+ [ new ( "operation" , "sync" ) ] ) ;
35136 }
36137
37138 private async Task Upload ( SyncPlan plan , Cancel ctx )
38139 {
39140 var uploadRequests = plan . AddRequests . Cast < UploadRequest > ( ) . Concat ( plan . UpdateRequests ) . ToList ( ) ;
40141 if ( uploadRequests . Count > 0 )
41142 {
42- _logger . LogInformation ( "Starting to process {Count} uploads using directory upload" , uploadRequests . Count ) ;
143+ using var uploadActivity = ApplyStrategyActivitySource . StartActivity ( "upload files" , ActivityKind . Client ) ;
144+ _ = uploadActivity ? . SetTag ( "docs.sync.upload.count" , uploadRequests . Count ) ;
145+
146+ var addCount = plan . AddRequests . Count ;
147+ var updateCount = plan . UpdateRequests . Count ;
148+
149+ _logger . LogInformation ( "Starting to process {AddCount} new files and {UpdateCount} updated files" , addCount , updateCount ) ;
150+
151+ // Emit file-level metrics (low cardinality) and logs for each file
152+ foreach ( var upload in uploadRequests )
153+ {
154+ var operation = plan . AddRequests . Contains ( upload ) ? "add" : "update" ;
155+ var fileSize = context . WriteFileSystem . FileInfo . New ( upload . LocalPath ) . Length ;
156+ var extension = Path . GetExtension ( upload . DestinationPath ) . ToLowerInvariant ( ) ;
157+
158+ // Record counters
159+ if ( operation == "add" )
160+ FilesAddedCounter . Add ( 1 ) ;
161+ else
162+ FilesUpdatedCounter . Add ( 1 ) ;
163+
164+ // Record file size distribution
165+ FileSizeHistogram . Record ( fileSize , [ new ( "operation" , operation ) ] ) ;
166+
167+ // Record by extension (low cardinality)
168+ if ( ! string . IsNullOrEmpty ( extension ) )
169+ {
170+ FilesByExtensionCounter . Add ( 1 ,
171+ new ( "operation" , operation ) ,
172+ new ( "extension" , extension ) ) ;
173+ }
174+
175+ // Log individual file operations for detailed analysis
176+ LogFileOperation ( _logger , operation , upload . DestinationPath , fileSize ) ;
177+ }
178+
43179 var tempDir = Path . Combine ( context . WriteFileSystem . Path . GetTempPath ( ) , context . WriteFileSystem . Path . GetRandomFileName ( ) ) ;
44180 _ = context . WriteFileSystem . Directory . CreateDirectory ( tempDir ) ;
45181 try
@@ -61,10 +197,11 @@ private async Task Upload(SyncPlan plan, Cancel ctx)
61197 UploadFilesConcurrently = true
62198 } ;
63199 directoryRequest . UploadDirectoryProgressEvent += DisplayProgress ;
64- _logger . LogInformation ( "Uploading {Count} files to S3" , uploadRequests . Count ) ;
200+ _logger . LogInformation ( "Uploading {Count} files to S3 bucket {BucketName} " , uploadRequests . Count , bucketName ) ;
65201 _logger . LogDebug ( "Starting directory upload from {TempDir}" , tempDir ) ;
66202 await transferUtility . UploadDirectoryAsync ( directoryRequest , ctx ) ;
67- _logger . LogDebug ( "Directory upload completed" ) ;
203+ _logger . LogInformation ( "Successfully uploaded {Count} files ({AddCount} added, {UpdateCount} updated)" ,
204+ uploadRequests . Count , addCount , updateCount ) ;
68205 }
69206 finally
70207 {
@@ -81,6 +218,31 @@ private async Task Delete(SyncPlan plan, Cancel ctx)
81218 var deleteRequests = plan . DeleteRequests . ToList ( ) ;
82219 if ( deleteRequests . Count > 0 )
83220 {
221+ using var deleteActivity = ApplyStrategyActivitySource . StartActivity ( "delete files" , ActivityKind . Client ) ;
222+ _ = deleteActivity ? . SetTag ( "docs.sync.delete.count" , deleteRequests . Count ) ;
223+
224+ _logger . LogInformation ( "Starting to delete {Count} files from S3 bucket {BucketName}" , deleteRequests . Count , bucketName ) ;
225+
226+ // Emit file-level metrics (low cardinality) and logs for each file
227+ foreach ( var delete in deleteRequests )
228+ {
229+ var extension = Path . GetExtension ( delete . DestinationPath ) . ToLowerInvariant ( ) ;
230+
231+ // Record counter
232+ FilesDeletedCounter . Add ( 1 ) ;
233+
234+ // Record by extension (low cardinality)
235+ if ( ! string . IsNullOrEmpty ( extension ) )
236+ {
237+ FilesByExtensionCounter . Add ( 1 ,
238+ new ( "operation" , "delete" ) ,
239+ new ( "extension" , extension ) ) ;
240+ }
241+
242+ // Log individual file operations for detailed analysis
243+ LogFileOperation ( _logger , "delete" , delete . DestinationPath , 0 ) ;
244+ }
245+
84246 // Process deletes in batches of 1000 (AWS S3 limit)
85247 foreach ( var batch in deleteRequests . Chunk ( 1000 ) )
86248 {
@@ -95,16 +257,22 @@ private async Task Delete(SyncPlan plan, Cancel ctx)
95257 var response = await s3Client . DeleteObjectsAsync ( deleteObjectsRequest , ctx ) ;
96258 if ( response . HttpStatusCode != System . Net . HttpStatusCode . OK )
97259 {
260+ _logger . LogError ( "Delete batch failed with status code {StatusCode}" , response . HttpStatusCode ) ;
98261 foreach ( var error in response . DeleteErrors )
262+ {
263+ _logger . LogError ( "Failed to delete {Key}: {Message}" , error . Key , error . Message ) ;
99264 collector . EmitError ( error . Key , $ "Failed to delete: { error . Message } ") ;
265+ }
100266 }
101267 else
102268 {
103269 var newCount = Interlocked . Add ( ref deleteCount , batch . Length ) ;
104- _logger . LogInformation ( "Deleted {Count} objects ({DeleteCount }/{TotalDeleteCount })" ,
270+ _logger . LogInformation ( "Deleted {BatchCount} files ({CurrentCount }/{TotalCount })" ,
105271 batch . Length , newCount , deleteRequests . Count ) ;
106272 }
107273 }
274+
275+ _logger . LogInformation ( "Successfully deleted {Count} files" , deleteCount ) ;
108276 }
109277 }
110278}
0 commit comments