Skip to content

Conversation

@Mangaal
Copy link
Collaborator

@Mangaal Mangaal commented Sep 16, 2025

This PR introduces a unidirectional(agent → principal) log streaming service and wires it into the resource-proxy path so the Principal can serve Kubernetes pod logs to the Argo CD UI. The Agent handles both static logs (follow=false) and live streaming (follow=true) with resume support.

What’s included:

  • New LogStreaming service (gRPC) — Agent opens a client-streaming RPC and pushes log chunks keyed by request_uuid; Principal writes directly to the HTTP response stream and returns a final status when the stream ends.
  • Principal resource-proxy integration — /…/pods/{name}/log requests are recognised, the HTTP writer is registered, and a log event is enqueued to the Agent.
  • Agent log workers — static and live log handlers; time-window flush or 64KiB chunk flush; live streaming has resume (SinceTime) on transient errors.

Key feature:

  • Principal LogStream gRPC server & HTTP bridge.
  • Agent log streaming implementation (static + live + resume).
  • Principal resource proxy: log-subresource branch & handoff to LogStream.

Assisted-by: Cursor/Gemini etc

logs.mov

Summary by CodeRabbit

  • New Features
    • Add container log retrieval over HTTP with server-side streaming for static and live ("follow") logs.
    • Resume-capable live streaming with reconnects, timestamp-based resume, duplicate-request protection, and auth-aware retry behavior.
  • Tests
    • New unit and end-to-end tests covering log streaming, resume semantics, Kubernetes log retrieval, and HTTP/gRPC integration.

✏️ Tip: You can customize this high-level summary in your review settings.

@Mangaal Mangaal force-pushed the log-streaming branch 2 times, most recently from 4d5e132 to 30aab14 Compare September 16, 2025 14:58
@Mangaal Mangaal closed this Sep 16, 2025
@Mangaal Mangaal reopened this Sep 16, 2025
Signed-off-by: Mangaal <[email protected]>
(cherry picked from commit 2a08301)
Signed-off-by: Mangaal <[email protected]>
(cherry picked from commit d07df62)
Signed-off-by: Mangaal <[email protected]>
(cherry picked from commit 161f2a4)
Signed-off-by: Mangaal <[email protected]>
(cherry picked from commit 30aab14)
Signed-off-by: Mangaal <[email protected]>
(cherry picked from commit f8a6666)
Signed-off-by: Mangaal <[email protected]>
(cherry picked from commit e820c35)
@codecov-commenter
Copy link

codecov-commenter commented Sep 17, 2025

Codecov Report

❌ Patch coverage is 44.61248% with 293 lines in your changes missing coverage. Please review.
✅ Project coverage is 45.52%. Comparing base (22377e0) to head (7d3ecab).

Files with missing lines Patch % Lines
agent/log.go 33.87% 152 Missing and 12 partials ⚠️
principal/resource.go 12.50% 49 Missing and 7 partials ⚠️
internal/event/event.go 0.00% 43 Missing ⚠️
principal/apis/logstreamapi/logstream.go 83.13% 24 Missing and 4 partials ⚠️
agent/inbound.go 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #569      +/-   ##
==========================================
- Coverage   45.56%   45.52%   -0.05%     
==========================================
  Files          90       92       +2     
  Lines       10011    10519     +508     
==========================================
+ Hits         4562     4789     +227     
- Misses       4978     5241     +263     
- Partials      471      489      +18     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Mangaal Mangaal marked this pull request as ready for review September 17, 2025 12:01
@chetan-rns
Copy link
Collaborator

@Mangaal I see this error intermittently on the UI. Works fine after requesting the logs again. I guess we are not handling EOF somewhere?

Get "https://rathole-container-internal:9090/api/v1/namespaces/
guestbook/pods/kustomize-guestbook-ui-7689b675bc-cbv8h/log?container=guestbook-ui&follow=true&
tailLines=1000&timestamps=true": EOF

@Mangaal
Copy link
Collaborator Author

Mangaal commented Sep 30, 2025

@chetan-rns, Thanks for reviewing my PR. I’ve updated it and addressed your suggestions. Please take a look when you get a chance.

Copy link
Collaborator

@chetan-rns chetan-rns left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Mangaal Sorry for the delay. Added a few questions around simplifying the agent logic. IMO, the agent should only propagate the options from the argocd server to the client-go's GetLogs(). Read the bytes from the reader until EOF, forward them back in chunks, and return any errors. We can avoid extracting timestamps to modify the sinceTime dynamically. I think we can rely on the argocd server to handle the chunks. This way the agent doesn't have to do any extra work. WDYT @jannfis

Copy link
Collaborator

@chetan-rns chetan-rns left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Mangaal! The overall PR looks good to me.

@jannfis @jgwest Please take a look when you have a moment.

@jannfis
Copy link
Collaborator

jannfis commented Oct 31, 2025

@coderabbitai full review

@coderabbitai
Copy link

coderabbitai bot commented Oct 31, 2025

✅ Actions performed

Full review triggered.

@coderabbitai
Copy link

coderabbitai bot commented Oct 31, 2025

Walkthrough

Adds container log streaming: new ContainerLogRequest event and payload, agent-side static and resumable live log workflows with in‑flight dedupe, a client‑streaming gRPC LogStream API and server with HTTP writer registration, tests/mocks, e2e log retrieval helpers, and a backoff dependency.

Changes

Cohort / File(s) Summary
Agent core & inbound
agent/agent.go, agent/inbound.go
Agent gains inflightMu sync.Mutex and inflightLogs map[string]struct{}; inbound event switch routes TargetContainerLog to a.processIncomingContainerLogRequest(ev).
Agent log implementation
agent/log.go
New end-to-end log handling: processIncomingContainerLogRequest, startLogStreamIfNew dedupe, handleStaticLogs, handleLiveStreaming/streamLogsWithResume, gRPC client creation, Kubernetes PodLog stream creation, chunked streaming, timestamp extraction, backoff/retry, and error semantics.
Agent tests & helpers
agent/log_test.go
Added MockLogStreamClient, MockReadCloser, test agents/helpers and tests for timestamp extraction, kube stream creation, dedupe behavior, static/live streaming, cancellation, and lifecycle cases.
Event model
internal/event/event.go
Added TargetContainerLog, ContainerLogRequest struct, Event.ContainerLogRequest() extractor, and EventSource.NewLogRequestEvent builder; updated target recognition and param parsing.
Principal server wiring
principal/server.go, principal/listen.go
Server now holds logStream *logstream.Server; LogStreamService registered with gRPC server.
Principal resource handling
principal/resource.go
Special-case log subresource: validate namespace/pod, register HTTP writer/session, emit log event, support follow (stream) vs static semantics, wait/timeout handling, and client-disconnect behavior.
LogStream protobuf
principal/apis/logstreamapi/logstream.proto
New proto: LogStreamData (request_uuid, data, eof, error), LogStreamResponse, and LogStreamService with client‑streaming StreamLogs.
LogStream server impl
principal/apis/logstreamapi/logstream.go
New exported logstream.Server with session map, RegisterHTTP, StreamLogs RPC, message processing (data/EOF/errors), WaitForCompletion, safe flushing, cancellation propagation, and session finalization.
LogStream server tests
principal/apis/logstreamapi/logstream_test.go
Tests for server lifecycle, HTTP registration, streaming workflows (data, EOF, agent errors), cancellation, completion waiting, panic safety, and concurrency.
LogStream mocks
principal/apis/logstreamapi/mock/mock.go
Added MockLogStreamServer, MockHTTPResponseWriter, PanicFlusher, and MockWriterWithoutFlusher for unit tests.
E2E client helpers
test/e2e/fixture/argoclient.go
Added GetLogs and GetApplicationLogs methods to fetch pod/application logs with container and tailLines query parameters.
E2E tests
test/e2e/logs_test.go
New LogsStreamingTestSuite with tests that create apps, wait for sync/health, locate pods, fetch logs, and assert non-empty logs for managed and autonomous clusters.
Dependencies
go.mod
Added github.com/cenkalti/backoff/v4 v4.3.0.
Tooling
hack/generate-proto.sh
Added proto generation target for principal/apis/logstreamapi.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant Client as HTTP Client
    participant Principal as Principal (HTTP + LogStream gRPC)
    participant Agent as Agent
    participant K8s as Kubernetes API

    Client->>Principal: GET /resources/{ns}/{pod}/log?requestUUID&follow
    Principal->>Principal: RegisterHTTP(requestUUID) → create session & writer
    Principal->>Agent: Emit ContainerLogRequest event (requestUUID, follow,...)

    Agent->>Agent: startLogStreamIfNew(requestUUID) (dedupe)
    alt Static (follow=false)
        Agent->>K8s: PodLogs(follow=false, options)
        K8s-->>Agent: log bytes, then EOF
        Agent->>Principal: Stream gRPC chunks (data + EOF)
        Principal->>Client: write bytes, flush, close
    else Live (follow=true)
        Agent->>K8s: PodLogs(follow=true, sinceTime/tail)
        loop resume & retry (backoff)
            K8s-->>Agent: incremental log bytes (with timestamps)
            Agent->>Principal: Stream gRPC chunks
            Principal->>Client: write & flush bytes
        end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

  • Focus review on:
    • agent/log.go — streaming logic, resume/backoff, timestamp parsing, inflight dedupe and mutex usage
    • principal/resource.go — HTTP writer/session lifecycle, blocking vs streaming logic, timeouts
    • principal/apis/logstreamapi/logstream.go — session concurrency, safeFlush, error/cancel propagation
    • Tests/mocks — correctness of EOF/error signaling and timing-sensitive behavior

Poem

🐰 I tunneled bytes from pod to shore,

timestamps stitched, I hop once more.
Backoff hums, retries in tune,
streams resume beneath the moon.
Logs leap out — a rabbit's boon!

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 70.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(logstream): Log streaming for argocd agent' clearly and accurately summarizes the primary change: implementing log streaming capability for the Argo CD agent.
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f957238 and 66bace8.

⛔ Files ignored due to path filters (3)
  • go.sum is excluded by !**/*.sum
  • pkg/api/grpc/logstreamapi/logstream.pb.go is excluded by !**/*.pb.go
  • pkg/api/grpc/logstreamapi/logstream_grpc.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (15)
  • agent/agent.go (1 hunks)
  • agent/inbound.go (1 hunks)
  • agent/log.go (1 hunks)
  • agent/log_test.go (1 hunks)
  • go.mod (1 hunks)
  • internal/event/event.go (5 hunks)
  • principal/apis/logstream/logstream.go (1 hunks)
  • principal/apis/logstream/logstream.proto (1 hunks)
  • principal/apis/logstream/logstream_test.go (1 hunks)
  • principal/apis/logstream/mock/mock.go (1 hunks)
  • principal/listen.go (2 hunks)
  • principal/resource.go (2 hunks)
  • principal/server.go (3 hunks)
  • test/e2e/fixture/argoclient.go (1 hunks)
  • test/e2e/logs_test.go (1 hunks)
🧰 Additional context used
🪛 Buf (1.59.0)
principal/apis/logstream/logstream.proto

17-17: Files with package "logstreamapi" must be within a directory "logstreamapi" relative to root but were in directory "principal/apis/logstream".

(PACKAGE_DIRECTORY_MATCH)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
principal/apis/logstreamapi/logstream.proto (1)

17-20: Fix Buf PACKAGE_DIRECTORY_MATCH failure.

buf lint still reports PACKAGE_DIRECTORY_MATCH because the proto package logstreamapi must live directly under a logstreamapi directory relative to the module root, while the current path principal/apis/logstreamapi still violates that rule. Adjust either the package declaration (e.g. package principal.apis.logstreamapi;) or the directory/module configuration so the package path and filesystem layout align; otherwise, Buf-based lint/codegen will continue to fail. Based on static analysis hints

🧹 Nitpick comments (1)
principal/apis/logstreamapi/logstream.go (1)

289-293: Lower the per-chunk log level to avoid flooding principal logs.

Writing an Info entry for every chunk sent to the HTTP client will spam logs during live streams and adds measurable overhead. Please drop this log or demote it to Trace, keeping high-volume traffic out of the default log level.

-	logCtx.WithFields(logrus.Fields{
-		"data_length": len(data),
-		"request_id":  reqID,
-	}).Info("HTTP write and flush successful")
+	logCtx.WithFields(logrus.Fields{
+		"data_length": len(data),
+		"request_id":  reqID,
+	}).Trace("HTTP write and flush successful")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 66bace8 and 6f649a0.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (12)
  • agent/agent.go (2 hunks)
  • agent/inbound.go (1 hunks)
  • agent/log.go (1 hunks)
  • agent/log_test.go (1 hunks)
  • go.mod (1 hunks)
  • internal/event/event.go (5 hunks)
  • principal/apis/logstreamapi/logstream.go (1 hunks)
  • principal/apis/logstreamapi/logstream.proto (1 hunks)
  • principal/apis/logstreamapi/logstream_test.go (1 hunks)
  • principal/apis/logstreamapi/mock/mock.go (1 hunks)
  • principal/server.go (3 hunks)
  • test/e2e/fixture/argoclient.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • go.mod
  • agent/agent.go
🧰 Additional context used
🧬 Code graph analysis (6)
principal/apis/logstreamapi/logstream_test.go (2)
principal/apis/logstreamapi/logstream.go (1)
  • NewServer (66-71)
principal/apis/logstreamapi/mock/mock.go (4)
  • NewMockHTTPResponseWriter (119-123)
  • MockWriterWithoutFlusher (168-172)
  • NewMockLogStreamServer (41-46)
  • PanicFlusher (161-161)
principal/server.go (1)
principal/apis/logstreamapi/logstream.go (2)
  • Server (32-36)
  • NewServer (66-71)
principal/apis/logstreamapi/logstream.go (2)
principal/server.go (2)
  • Server (71-164)
  • NewServer (183-388)
pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (1)
  • LogStreamService_StreamLogsServer (104-108)
agent/log_test.go (5)
principal/apis/logstreamapi/mock/mock.go (2)
  • MockLogStreamServer (30-39)
  • NewMockLogStreamServer (41-46)
pkg/api/grpc/logstreamapi/logstream.pb.go (6)
  • LogStreamData (24-37)
  • LogStreamData (52-52)
  • LogStreamData (67-69)
  • LogStreamResponse (100-109)
  • LogStreamResponse (124-124)
  • LogStreamResponse (139-141)
agent/agent.go (1)
  • Agent (62-116)
test/fake/kube/kubernetes.go (1)
  • NewKubernetesFakeClientWithResources (69-78)
internal/event/event.go (1)
  • ContainerLogRequest (959-973)
agent/inbound.go (1)
internal/event/event.go (1)
  • TargetContainerLog (85-85)
agent/log.go (4)
agent/agent.go (1)
  • Agent (62-116)
internal/event/event.go (1)
  • ContainerLogRequest (959-973)
pkg/api/grpc/logstreamapi/logstream.pb.go (3)
  • LogStreamData (24-37)
  • LogStreamData (52-52)
  • LogStreamData (67-69)
pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (1)
  • LogStreamService_StreamLogsClient (46-50)
🪛 Buf (1.59.0)
principal/apis/logstreamapi/logstream.proto

17-17: Files with package "logstreamapi" must be within a directory "logstreamapi" relative to root but were in directory "principal/apis/logstreamapi".

(PACKAGE_DIRECTORY_MATCH)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
agent/log.go (2)

238-246: Consider propagating actual error details to the principal.

Line 244 sends a generic error message "log stream read failed" to the principal. Sending the actual error message (e.g., err.Error()) would aid debugging on the principal/UI side, similar to how line 119 and line 294 already do this for other error paths.

Apply this diff if you want more specific error messages:

 		if errors.Is(err, io.EOF) {
 			_ = stream.Send(&logstreamapi.LogStreamData{RequestUuid: logReq.UUID, Eof: true})
 			return nil
 		}
 		logCtx.WithError(err).Error("Error reading log stream")
-		_ = stream.Send(&logstreamapi.LogStreamData{RequestUuid: logReq.UUID, Error: "log stream read failed"})
+		_ = stream.Send(&logstreamapi.LogStreamData{RequestUuid: logReq.UUID, Error: err.Error()})
 		return err

268-272: Consider extracting the hardcoded overlap duration to a named constant.

The -100ms overlap at line 270 ensures no log lines are lost during resume, which is important. However, it's hardcoded and lacks explanation. Extracting it to a named constant would improve readability and make the intent clear.

For example, add this constant near line 251:

const (
	waitForReconnect  = 10 * time.Second // how long we poll IsConnected() after Unauthenticated
	pollEvery         = 1 * time.Second
	resumeOverlap     = 100 * time.Millisecond // overlap to prevent losing log lines during resume
)

Then update line 270:

 		resumeReq := *logReq
 		if lastTimestamp != nil {
-			t := lastTimestamp.Add(-100 * time.Millisecond)
+			t := lastTimestamp.Add(-resumeOverlap)
 			resumeReq.SinceTime = t.Format(time.RFC3339)
 		}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6f649a0 and 0bcdf95.

📒 Files selected for processing (1)
  • agent/log.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
agent/log.go (4)
agent/agent.go (1)
  • Agent (62-116)
internal/event/event.go (1)
  • ContainerLogRequest (959-973)
pkg/api/grpc/logstreamapi/logstream.pb.go (3)
  • LogStreamData (24-37)
  • LogStreamData (52-52)
  • LogStreamData (67-69)
pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (2)
  • LogStreamService_StreamLogsClient (46-50)
  • NewLogStreamServiceClient (33-35)
🔇 Additional comments (10)
agent/log.go (10)

17-34: LGTM!

The imports are well-organized and all dependencies are appropriate for the log streaming functionality.


36-58: LGTM!

Clean entry point with appropriate error handling and structured logging.


60-97: LGTM!

The duplicate detection and lifecycle management logic is correct. The cleanup function properly invokes cancel() and removes the inflight entry, addressing the critical issues from previous reviews.


148-165: LGTM!

The goroutine correctly defers cleanup() at line 156, ensuring the inflight entry is always released when the live stream ends. The panic handler only logs the panic, relying on the deferred cleanup for resource management—this is the correct pattern.


167-176: LGTM!

Simple and correct helper for creating the gRPC log stream.


178-201: LGTM!

The Kubernetes log stream configuration is correct. Setting Timestamps to true is essential for the resume capability, and the SinceTime parsing properly supports retry logic.


282-286: Verify that the initial empty data message is necessary.

The code sends an empty LogStreamData message at the start of each attempt for "health checks" (lines 282-286). While the comment explains this, it's worth confirming whether the principal actually requires this initial message or if it could be omitted to simplify the protocol.

If the principal doesn't strictly require this empty message, consider removing it to simplify the streaming flow.


322-346: LGTM, but consider simplifying the reconnection wait loop if possible.

The authentication failure handling is sophisticated and correct: it pauses exponential backoff and waits for reconnection by polling IsConnected(). The manual ticker loop (lines 327-344) works correctly, though it could potentially be simplified. Given the complexity is warranted for reliable auth handling, this is acceptable as-is.


402-408: Verify EOF handling is intentional for live streaming.

The code does NOT send an EOF message when rc.Read() returns io.EOF (lines 402-408), instead treating it as an error. In contrast, streamLogsToCompletion (line 239) handles EOF as clean termination. For live streaming with follow=true, is it intentional to treat EOF as an error and retry?

This might be related to the intermittent EOF errors reported in the PR comments. If the Kubernetes stream returns EOF prematurely (e.g., during reconnection), treating it as an error would trigger a retry with the resume logic, which seems correct. However, sending err.Error() (which would be "EOF") to the principal as an error message might surface in the UI.

Consider adding explicit EOF handling to distinguish between expected EOF (if any) and unexpected errors:

if err != nil {
	if errors.Is(err, io.EOF) {
		// For follow=true, EOF from k8s might indicate temporary disruption
		// Let the resume logic handle it
		logCtx.Info("EOF from k8s stream, will resume")
		return lastTimestamp, err
	}
	logCtx.WithError(err).Error("Error reading log stream")
	_ = stream.Send(&logstreamapi.LogStreamData{RequestUuid: logReq.UUID, Error: err.Error()})
	return lastTimestamp, err
}

412-438: LGTM!

The timestamp extraction logic is well-optimized with appropriate guards and early exits. Trying both RFC3339Nano and RFC3339 formats ensures compatibility with Kubernetes timestamp variations.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
agent/log.go (1)

99-146: Remove redundant defer rc.Close() at line 123.

The rc.Close() is deferred at both line 123 and inside streamLogsToCompletion at line 213, resulting in a double-close. While most io.ReadCloser implementations handle this gracefully, calling Close() twice is a code smell and not guaranteed safe across all implementations.

Apply this diff to remove the redundant defer:

 	// Create Kubernetes log stream
 	rc, err := a.createKubernetesLogStream(ctx, logReq)
 	if err != nil {
 		_ = stream.Send(&logstreamapi.LogStreamData{RequestUuid: logReq.UUID, Eof: true, Error: err.Error()})
 		_, _ = stream.CloseAndRecv()
 		return err
 	}
-	defer rc.Close()
 
 	err = a.streamLogsToCompletion(ctx, stream, rc, logReq, logCtx)
🧹 Nitpick comments (2)
agent/log.go (2)

280-286: Clarify the purpose of the initial empty message.

The comment "Used for health checks" may be misleading. This initial message appears to establish the stream connection rather than perform a health check. Consider clarifying the comment to accurately reflect its purpose (e.g., "Send initial message to establish stream connection").


383-392: Consider adding a comment to explain the timestamp extraction logic.

The logic for extracting the timestamp from the last complete line in the buffer is correct but somewhat dense. A brief comment explaining why we search backwards for the last complete line (to enable resume from the most recent known timestamp) would improve maintainability.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0bcdf95 and 17637b3.

⛔ Files ignored due to path filters (2)
  • pkg/api/grpc/logstreamapi/logstream.pb.go is excluded by !**/*.pb.go
  • pkg/api/grpc/logstreamapi/logstream_grpc.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (4)
  • agent/log.go (1 hunks)
  • hack/generate-proto.sh (1 hunks)
  • principal/apis/logstreamapi/logstream.proto (1 hunks)
  • principal/server.go (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • principal/server.go
🧰 Additional context used
🧬 Code graph analysis (1)
agent/log.go (4)
agent/agent.go (1)
  • Agent (62-116)
internal/event/event.go (1)
  • ContainerLogRequest (959-973)
pkg/api/grpc/logstreamapi/logstream.pb.go (3)
  • LogStreamData (38-51)
  • LogStreamData (66-66)
  • LogStreamData (81-83)
pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (1)
  • LogStreamService_StreamLogsClient (46-50)
🔇 Additional comments (7)
hack/generate-proto.sh (1)

26-26: Proto code generation target addition looks good.

The new logstreamapi entry follows the established pattern and will be processed identically to existing API targets by the generation loop.

principal/apis/logstreamapi/logstream.proto (2)

17-19: Proto package-to-directory alignment appears to resolve prior Buf lint issue.

The file location principal/apis/logstreamapi/logstream.proto now correctly matches the declared package principal.apis.logstreamapi and go_package option, satisfying Buf's PACKAGE_DIRECTORY_MATCH rule that was flagged in the previous review.


22-44: Proto schema structure aligns with unidirectional client-streaming design.

LogStreamData messages (with request_uuid correlation, data payload, eof marker, and error field) enable the agent to push log chunks to the principal, which the principal writes directly to HTTP response streams. The single LogStreamResponse returned after stream closure provides final status and statistics. The StreamLogs RPC's client-streaming semantics correctly implement the agent→principal push model described in the PR objectives.

agent/log.go (4)

60-97: LGTM: Cleanup and duplicate detection logic is correct.

The in-flight tracking with cleanup function properly calls cancel() before deleting the map entry, and the separation between static (with deferred cleanup) and live streaming (with cleanup passed to goroutine) correctly handles resource lifecycle for both paths.


148-165: LGTM: Live streaming goroutine properly defers cleanup.

The goroutine correctly defers cleanup() at the top, ensuring the in-flight entry and context cancellation are released when streaming ends, regardless of how it terminates (success, error, or panic).


250-360: LGTM: Resume logic with exponential backoff is well-designed.

The error handling properly distinguishes between terminal errors (Canceled/NotFound), authentication errors requiring reconnection wait, and transient errors handled with exponential backoff. The 100ms timestamp overlap prevents log loss during resume.


411-437: LGTM: Timestamp extraction is robust and efficient.

The function properly guards against edge cases (short lines, missing spaces, long tokens) and handles both RFC3339 and RFC3339Nano formats. The fast-path checks prevent unnecessary parsing attempts.

Signed-off-by: Mangaal <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 17637b3 and 5e00613.

📒 Files selected for processing (1)
  • agent/log.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
agent/log.go (4)
agent/agent.go (1)
  • Agent (62-116)
internal/event/event.go (1)
  • ContainerLogRequest (959-973)
pkg/api/grpc/logstreamapi/logstream.pb.go (3)
  • LogStreamData (38-51)
  • LogStreamData (66-66)
  • LogStreamData (81-83)
pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (1)
  • LogStreamService_StreamLogsClient (46-50)

Signed-off-by: Mangaal <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5e00613 and 152e239.

📒 Files selected for processing (2)
  • agent/log.go (1 hunks)
  • agent/log_test.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
agent/log_test.go (5)
principal/apis/logstreamapi/mock/mock.go (2)
  • MockLogStreamServer (30-39)
  • NewMockLogStreamServer (41-46)
pkg/api/grpc/logstreamapi/logstream.pb.go (6)
  • LogStreamData (38-51)
  • LogStreamData (66-66)
  • LogStreamData (81-83)
  • LogStreamResponse (114-123)
  • LogStreamResponse (138-138)
  • LogStreamResponse (153-155)
agent/agent.go (1)
  • Agent (62-116)
test/fake/kube/kubernetes.go (1)
  • NewKubernetesFakeClientWithResources (69-78)
internal/event/event.go (1)
  • ContainerLogRequest (959-973)
agent/log.go (4)
agent/agent.go (1)
  • Agent (62-116)
internal/event/event.go (1)
  • ContainerLogRequest (959-973)
pkg/api/grpc/logstreamapi/logstream.pb.go (3)
  • LogStreamData (38-51)
  • LogStreamData (66-66)
  • LogStreamData (81-83)
pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (2)
  • LogStreamService_StreamLogsClient (46-50)
  • NewLogStreamServiceClient (33-35)

agent/agent.go Outdated
clusterCache *appstatecache.Cache

inflightMu sync.Mutex
inflightLogs map[string]context.CancelFunc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things:

  • Can you add a description of what this field is used for
  • AFAICT the value (cancel func) is never called. Is this intentional?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added this to the block duplicate log request. I have added a comment.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
agent/log_test.go (2)

83-93: Hardcoded UUID in mock response may cause test issues.

The CloseAndRecv method returns a hardcoded RequestUuid of "test-uuid", but test requests use "test-uuid-123" (line 156). While this doesn't break current tests that don't validate the response UUID, it could lead to confusion or false positives if future tests verify the response.

Consider storing the request UUID in the mock and returning it:

 type MockLogStreamClient struct {
 	*mock.MockLogStreamServer
 	sentData []*logstreamapi.LogStreamData
 	sendFunc func(data *logstreamapi.LogStreamData) error
+	requestUuid string
 	mu       sync.RWMutex
 }

-func NewMockLogStreamClient(ctx context.Context) *MockLogStreamClient {
+func NewMockLogStreamClient(ctx context.Context, requestUuid string) *MockLogStreamClient {
 	return &MockLogStreamClient{
 		MockLogStreamServer: mock.NewMockLogStreamServer(ctx),
 		sentData:            make([]*logstreamapi.LogStreamData, 0),
+		requestUuid:         requestUuid,
 		sendFunc: func(data *logstreamapi.LogStreamData) error {
 			return nil
 		},
 	}
 }

 func (m *MockLogStreamClient) CloseAndRecv() (*logstreamapi.LogStreamResponse, error) {
 	// Simulate closing and receiving a response
 	m.mu.RLock()
 	linesReceived := len(m.sentData)
 	m.mu.RUnlock()
 	return &logstreamapi.LogStreamResponse{
-		RequestUuid:   "test-uuid",
+		RequestUuid:   m.requestUuid,
 		Status:        200,
 		LinesReceived: int32(linesReceived),
 	}, nil
 }

275-283: Test relies on panic which indicates incomplete setup.

The test expects a panic when processing a new request, which suggests missing dependencies in the test agent. This makes the test fragile and doesn't properly validate the happy path.

Consider providing a complete mock setup so the test can verify the actual behavior without relying on panics:

 	t.Run("new request", func(t *testing.T) {
 		logReq := createTestLogRequest(false)
-		agent := createTestAgent()
-		// This will panic due to missing dependencies, but we can check if new request is processed
-		assert.Panics(t, func() {
-			agent.startLogStreamIfNew(logReq, logCtx)
-		})
-
+		agent := createTestAgentWithKubeClient()
+		// Provide mock remote connection, or test the specific behavior you want to validate
+		// rather than relying on panic
+		err := agent.startLogStreamIfNew(logReq, logCtx)
+		// Add appropriate assertions based on expected behavior
 	})
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 152e239 and a1ed82e.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (6)
  • agent/agent.go (2 hunks)
  • agent/inbound.go (1 hunks)
  • agent/log.go (1 hunks)
  • agent/log_test.go (1 hunks)
  • go.mod (1 hunks)
  • principal/server.go (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • go.mod
  • agent/agent.go
  • principal/server.go
🧰 Additional context used
🧬 Code graph analysis (3)
agent/inbound.go (1)
internal/event/event.go (1)
  • TargetContainerLog (85-85)
agent/log.go (5)
agent/agent.go (1)
  • Agent (62-117)
internal/event/event.go (1)
  • ContainerLogRequest (959-973)
internal/logging/logfields/logfields.go (1)
  • UUID (30-30)
pkg/api/grpc/logstreamapi/logstream.pb.go (3)
  • LogStreamData (38-51)
  • LogStreamData (66-66)
  • LogStreamData (81-83)
pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (1)
  • LogStreamService_StreamLogsClient (46-50)
agent/log_test.go (7)
principal/apis/logstreamapi/mock/mock.go (2)
  • MockLogStreamServer (30-39)
  • NewMockLogStreamServer (41-46)
pkg/api/grpc/logstreamapi/logstream.pb.go (6)
  • LogStreamData (38-51)
  • LogStreamData (66-66)
  • LogStreamData (81-83)
  • LogStreamResponse (114-123)
  • LogStreamResponse (138-138)
  • LogStreamResponse (153-155)
internal/logging/logfields/logfields.go (3)
  • Status (130-130)
  • UUID (30-30)
  • Name (59-59)
agent/agent.go (1)
  • Agent (62-117)
test/fake/kube/kubernetes.go (1)
  • NewKubernetesFakeClientWithResources (69-78)
internal/event/event.go (2)
  • ContainerLogRequest (959-973)
  • Create (56-56)
principal/apis/logstreamapi/logstream_test.go (1)
  • TestStreamLogs (100-216)
🔇 Additional comments (13)
agent/inbound.go (1)

77-78: LGTM! Container log routing added correctly.

The new event routing for TargetContainerLog follows the established pattern and integrates cleanly with the existing switch statement.

agent/log_test.go (4)

114-127: LGTM! Simple and effective test helper.

The MockReadCloser implementation is clean and provides useful test functionality.


166-210: LGTM! Comprehensive timestamp extraction tests.

The test cases cover the important timestamp formats and edge cases.


287-383: LGTM! Comprehensive streaming tests.

The tests effectively validate both successful streaming and error conditions, including cancellation and send failures.


249-256: Verify test expectation for non-existent pod.

The test expects NoError when requesting logs for a non-existent pod. This behavior should be verified against the actual createKubernetesLogStream implementation and the fake Kubernetes client configuration to determine if the assertion is correct or if an error should be expected.

agent/log.go (8)

36-58: LGTM! Clean entry point with proper context.

The log request processing entry point is well-structured with appropriate logging context and error handling.


60-91: LGTM! Duplicate detection and cleanup properly implemented.

The inflight log management correctly handles both static and live streaming cases, with appropriate cleanup for cancellation and map entry removal.


93-132: LGTM! Static log handling with appropriate error management.

The static log handling correctly manages stream creation, error handling, and authentication failures.


134-148: LGTM! Live streaming with proper cleanup and early ACK.

The live streaming goroutine correctly defers cleanup to ensure resources are released when the stream ends, and returns early ACK to prevent timeout.


150-181: LGTM! Stream creation with proper option handling.

Both gRPC and Kubernetes stream creation are correctly implemented with appropriate option transfer and error handling.


183-231: LGTM! Complete static streaming with proper resource management.

The streaming implementation correctly handles EOF as normal termination, closes the ReadCloser via defer, and checks for context cancellation before blocking reads.


233-333: LGTM! Robust resume logic with appropriate backoff strategy.

The resume implementation correctly handles different error scenarios: intentional stops, authentication failures with reconnection wait, and transient errors with exponential backoff.


335-418: LGTM! Streaming with timestamp extraction and proper resource cleanup.

The streaming implementation correctly closes the ReadCloser, treats EOF as successful termination, and extracts timestamps from log lines for resume capability. The timestamp extraction handles common RFC3339 variants efficiently.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
agent/log_test.go (4)

83-93: Hardcoded UUID in CloseAndRecv reduces test fidelity.

The RequestUuid field is hardcoded to "test-uuid" instead of using the actual UUID from the sent data. This could mask bugs where the wrong UUID is used.

Apply this diff to derive the UUID from sent data:

 func (m *MockLogStreamClient) CloseAndRecv() (*logstreamapi.LogStreamResponse, error) {
 	// Simulate closing and receiving a response
 	m.mu.RLock()
+	requestUUID := "test-uuid"
+	if len(m.sentData) > 0 {
+		requestUUID = m.sentData[0].RequestUuid
+	}
 	linesReceived := len(m.sentData)
 	m.mu.RUnlock()
 	return &logstreamapi.LogStreamResponse{
-		RequestUuid:   "test-uuid",
+		RequestUuid:   requestUUID,
 		Status:        200,
 		LinesReceived: int32(linesReceived),
 	}, nil
 }

273-280: Consider testing actual behavior instead of panic.

The test verifies that startLogStreamIfNew panics when dependencies are missing. This tests an implementation detail rather than behavior. Consider injecting minimal dependencies (e.g., a mock stream client) to test that the function actually initiates log streaming for new requests.


294-307: Clarify the assertion expectation.

The comment states "At least 2 data messages + EOF" but the assertion GreaterOrEqual(t, len(sentData), 2) is ambiguous—it's unclear whether the count includes or excludes the EOF message. Consider either being more explicit in the comment or adding a separate assertion for the EOF message.

Apply this diff to make the expectation clearer:

 		// Verify that data was sent
 		sentData := mockStream.GetSentData()
-		assert.GreaterOrEqual(t, len(sentData), 2) // At least 2 data messages + EOF
+		require.GreaterOrEqual(t, len(sentData), 1, "Expected at least one message")
 
 		// Check that the last message is EOF
 		lastMessage := sentData[len(sentData)-1]
 		assert.True(t, lastMessage.Eof)

328-355: Misleading assertion message.

Line 350 states "Should end with EOF, not timeout", but when streamErr is nil, it means the stream completed successfully, not that it encountered an EOF error. The message is confusing because EOF is a normal termination condition, not an error.

Apply this diff to clarify:

-		assert.Nil(t, streamErr, "Should end with EOF, not timeout")
+		assert.Nil(t, streamErr, "Stream should complete successfully")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a1ed82e and a743f88.

📒 Files selected for processing (1)
  • agent/log_test.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
agent/log_test.go (6)
principal/apis/logstreamapi/mock/mock.go (2)
  • MockLogStreamServer (30-39)
  • NewMockLogStreamServer (41-46)
pkg/api/grpc/logstreamapi/logstream.pb.go (6)
  • LogStreamData (38-51)
  • LogStreamData (66-66)
  • LogStreamData (81-83)
  • LogStreamResponse (114-123)
  • LogStreamResponse (138-138)
  • LogStreamResponse (153-155)
agent/agent.go (1)
  • Agent (62-117)
test/fake/kube/kubernetes.go (1)
  • NewKubernetesFakeClientWithResources (69-78)
internal/event/event.go (2)
  • ContainerLogRequest (959-973)
  • Create (56-56)
principal/apis/logstreamapi/logstream_test.go (1)
  • TestStreamLogs (100-216)
🔇 Additional comments (5)
agent/log_test.go (5)

114-128: LGTM!

Clean test double with proper close tracking for verification.


129-163: LGTM!

Test helpers properly initialize required fields for agent and log request testing.


165-210: LGTM!

Comprehensive timestamp extraction test covering multiple RFC3339 variants and edge cases.


356-379: LGTM!

Excellent test with error injection via SetSendFunc. This verifies that both the timestamp and error are properly returned when send fails, and confirms that at least one send attempt was made.


249-256: Verify the NoError assertion for non-existent pod requests.

The test asserts NoError when requesting logs for a non-existent pod. Typically, such requests should fail. Confirm whether this behavior is intentional—either the fake Kubernetes client returns no error for missing pods, or the assertion should expect an error instead.

Signed-off-by: Mangaal <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (6)
agent/log_test.go (6)

118-131: MockReadCloser is fine; only add sync if used across goroutines

MockReadCloser is a simple and useful test double. If you ever start asserting IsClosed() from a different goroutine than the one calling Close(), consider wrapping closed with a mutex or atomic.Bool to avoid races under the -race detector. As-is, for single-goroutine use, it’s perfectly adequate.


133-167: Agent test helpers are clear and keep setup DRY

createTestAgent / createTestAgentWithKubeClient / createTestLogRequest nicely encapsulate common setup and keep the tests focused on behavior rather than wiring.

One optional improvement: accept *testing.T and call t.Helper() plus t.Cleanup(cancel) so the contexts are always canceled even if a test fails early.


216-259: Strengthen non-existent pod expectations in TestCreateKubernetesLogStream

The “non-existent pod” subtest only checks that no error is returned; it doesn’t confirm that no stream is opened. If createKubernetesLogStream ever started returning a non-nil ReadCloser for 404s, this test wouldn’t catch it.

-	t.Run("Test createKubernetesLogStream with non-existent pod", func(t *testing.T) {
+	t.Run("Test createKubernetesLogStream with non-existent pod", func(t *testing.T) {
 		agent := createTestAgentWithKubeClient()
 		// Test with a non-existent pod
 		logReqNotFound := createTestLogRequest(false)
 		logReqNotFound.PodName = "non-existent-pod"
-		_, err := agent.createKubernetesLogStream(ctx, logReqNotFound)
-		assert.NoError(t, err)
+		rc, err := agent.createKubernetesLogStream(ctx, logReqNotFound)
+		assert.NoError(t, err)
+		assert.Nil(t, rc, "expected no log stream for non-existent pod")
 	})

261-282: Avoid relying on a panic to assert “new request” behavior

In the “new request” subtest you intentionally expect startLogStreamIfNew to panic due to missing dependencies. That makes the test brittle: once you wire in the remaining dependencies or refactor the function to fail more gracefully, the test will start failing even though behavior is improved.

A more robust pattern is:

  • Construct an Agent with the minimal mocks so startLogStreamIfNew can run without panicking.
  • Assert that the request UUID is added to inflightLogs and that no error is returned (or that a specific, well-defined error is returned).

284-314: Use predicate-style error assertion for context cancellation

The cancellation subtest currently asserts assert.Equal(t, context.Canceled, err). This ties the test to an exact error value; if streamLogsToCompletion ever wraps the context error (while still preserving context.Canceled as the cause), the behavior would be acceptable but the test would fail.

Switching to a predicate-style assertion will keep the test robust to error wrapping while still ensuring you surface cancellations correctly.

-		err := agent.streamLogsToCompletion(cancelCtx, mockStream, reader, logReq, logCtx)
-		assert.Error(t, err)
-		assert.Equal(t, context.Canceled, err)
+		err := agent.streamLogsToCompletion(cancelCtx, mockStream, reader, logReq, logCtx)
+		assert.Error(t, err)
+		// allow for potential error wrapping while still enforcing cancellation semantics
+		assert.ErrorIs(t, err, context.Canceled)

316-362: Isolate fixtures per subtest and relax overly strict data expectations in TestStreamLogs

Two small test-quality nits here:

  1. agent and logReq are shared across subtests. If streamLogs mutates agent state or the request (e.g., setting SinceTime), the second subtest may observe side effects from the first. Creating fresh fixtures per t.Run keeps tests independent.

  2. In the “send failure” subtest, assert.Equal(t, testData, string(sentData[0].Data)) couples the test tightly to the current flush/chunking strategy (single send with the full buffer). If you later switch to line-based or size-based chunking, the behavior would still be semantically correct but this assertion would fail.

You can address both with a small refactor:

-func TestStreamLogs(t *testing.T) {
-	agent := createTestAgentWithKubeClient()
-	ctx := context.Background()
-	logReq := createTestLogRequest(true)
-	logCtx := logrus.NewEntry(logrus.New())
+func TestStreamLogs(t *testing.T) {
+	ctx := context.Background()
+	logCtx := logrus.NewEntry(logrus.New())
@@
-	t.Run("successful streaming with data verification", func(t *testing.T) {
+	t.Run("successful streaming with data verification", func(t *testing.T) {
+		agent := createTestAgentWithKubeClient()
+		logReq := createTestLogRequest(true)
@@
-	t.Run("send failure returns timestamp and error", func(t *testing.T) {
+	t.Run("send failure returns timestamp and error", func(t *testing.T) {
+		agent := createTestAgentWithKubeClient()
+		logReq := createTestLogRequest(true)
@@
-		sentData := mockStream.GetSentData()
-		require.Greater(t, len(sentData), 0, "expected a send attempt before failure")
-		assert.Equal(t, testData, string(sentData[0].Data))
+		sentData := mockStream.GetSentData()
+		require.Greater(t, len(sentData), 0, "expected a send attempt before failure")
+		// Loosen expectation to avoid coupling to exact chunking strategy
+		assert.Contains(t, string(sentData[0].Data), "line 1", "expected at least the first log line to be sent before failure")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a743f88 and def4ac8.

📒 Files selected for processing (1)
  • agent/log_test.go (1 hunks)
🔇 Additional comments (2)
agent/log_test.go (2)

169-214: TestExtractTimestamp covers key formats and edge cases well

The table-driven tests exercise nanoseconds, milliseconds, plain seconds, missing timestamps, and a tab separator, which gives good confidence in extractTimestamp. The timePtr helper keeps expectations readable.

No issues here from my side.


364-367: timePtr helper is simple and appropriate

Tiny helper, but it keeps expected timestamps readable in the tests above. No changes needed here.

Signed-off-by: Mangaal <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
agent/log_test.go (2)

117-119: Optional: Add mutex protection to SetSendFunc.

While typical test usage sets this during setup before concurrent access, adding mutex protection would make the mock more robust against potential race conditions with Send (line 63).

 func (m *MockLogStreamClient) SetSendFunc(fn func(data *logstreamapi.LogStreamData) error) {
+	m.mu.Lock()
+	defer m.mu.Unlock()
 	m.sendFunc = fn
 }

277-284: Optional: Consider testing with proper setup instead of expecting panic.

Testing for panics typically indicates missing setup rather than validating intended behavior. Consider either:

  1. Properly mocking/setting up the required dependencies (e.g., gRPC stream, logger)
  2. Removing this test if it's not adding meaningful coverage

If the panic is intentional behavior for missing dependencies, consider documenting why this is the expected design.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between def4ac8 and 220ab7d.

📒 Files selected for processing (1)
  • agent/log_test.go (1 hunks)
🔇 Additional comments (12)
agent/log_test.go (12)

14-36: LGTM!

The package declaration and imports are well-organized and appropriate for the test scenarios covered in this file.


38-56: LGTM!

The mock client structure and constructor are well-designed for testing purposes, with sensible defaults and proper initialization.


86-100: LGTM! Previous panic concern resolved.

The CloseAndRecv implementation now safely handles the case when no data has been sent by defaulting to m.requestID and only accessing m.sentData[0] after verifying the slice is non-empty.


121-134: LGTM!

The MockReadCloser provides a clean, minimal implementation for testing with a helpful IsClosed() method for verification.


137-170: LGTM!

The test helper functions provide clean, reusable fixtures for setting up test scenarios.


172-217: LGTM!

The test provides comprehensive coverage of various timestamp formats and edge cases using a clean table-driven approach.


224-253: LGTM!

The test correctly sets up a pod in the fake Kubernetes client and verifies that log stream creation succeeds with proper cleanup.


267-276: LGTM!

The test correctly verifies that duplicate requests are handled gracefully by returning early without error.


287-317: LGTM!

Both test cases properly verify the streaming behavior: successful completion with EOF marker and proper context cancellation handling.


319-365: LGTM!

Both test cases thoroughly validate the streaming behavior:

  1. Successful streaming with timestamp extraction
  2. Proper error handling while preserving the last captured timestamp

The use of ErrorIs and verification of sent data demonstrates good testing practices.


367-370: LGTM!

Simple, clean helper function for creating time pointers in test assertions.


254-261: Verify expected behavior for non-existent pod.

The test expects NoError when requesting logs for a non-existent pod. This requires verification of whether createKubernetesLogStream intentionally handles missing pods gracefully, or if the fake Kubernetes client is masking expected errors that would occur in a real cluster. Confirm the intended behavior and adjust the test assertion accordingly.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 220ab7d and 7d3ecab.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (1)
  • go.mod (1 hunks)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants