diff --git a/plugin/input/file/worker.go b/plugin/input/file/worker.go index 18bb6738d..96485a104 100644 --- a/plugin/input/file/worker.go +++ b/plugin/input/file/worker.go @@ -202,6 +202,14 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi job.tail = append(job.tail[:0], accumBuf...) job.curOffset += readTotal + // Flush remaining data when EOF is reached and there's no trailing newline. + // Without this, the last line of a file that doesn't end with '\n' is silently dropped. + if isEOFReached && len(accumBuf) > 0 { + job.lastEventSeq = controller.In(sourceID, sourceName, pipeline.NewOffsets(lastOffset+scanned, offsets), accumBuf, isVirgin, metadataInfo) + accumBuf = accumBuf[:0] + job.tail = job.tail[:0] + } + // check if file was truncated. if isEOFReached { err := w.processEOF(file, job, jobProvider, lastOffset+readTotal) diff --git a/plugin/input/file/worker_test.go b/plugin/input/file/worker_test.go index e3defc21a..8a307a56f 100644 --- a/plugin/input/file/worker_test.go +++ b/plugin/input/file/worker_test.go @@ -81,11 +81,18 @@ func TestWorkerWork(t *testing.T) { expData: "abc\n", }, { - name: "should_ok_when_read_1_line_without_newline", + name: "should_emit_last_line_without_trailing_newline", maxEventSize: 1024, inFile: "abc", readBufferSize: 1024, - expData: "", + expData: "abc", + }, + { + name: "should_emit_last_line_among_multiple_without_trailing_newline", + maxEventSize: 1024, + inFile: "line1\nline2\nline3", + readBufferSize: 1024, + expData: "line3", }, } for _, tt := range tests {