-
Notifications
You must be signed in to change notification settings - Fork 51
feat(logstream): Log streaming for argocd agent #569
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
4d5e132 to
30aab14
Compare
Signed-off-by: Mangaal <[email protected]> (cherry picked from commit 2a08301)
Signed-off-by: Mangaal <[email protected]> (cherry picked from commit d07df62)
Signed-off-by: Mangaal <[email protected]> (cherry picked from commit 161f2a4)
Signed-off-by: Mangaal <[email protected]> (cherry picked from commit 30aab14)
Signed-off-by: Mangaal <[email protected]> (cherry picked from commit f8a6666)
Signed-off-by: Mangaal <[email protected]> (cherry picked from commit e820c35)
e820c35 to
3e1c2cd
Compare
Signed-off-by: Mangaal <[email protected]>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #569 +/- ##
==========================================
- Coverage 45.56% 45.52% -0.05%
==========================================
Files 90 92 +2
Lines 10011 10519 +508
==========================================
+ Hits 4562 4789 +227
- Misses 4978 5241 +263
- Partials 471 489 +18 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
@Mangaal I see this error intermittently on the UI. Works fine after requesting the logs again. I guess we are not handling |
Signed-off-by: Mangaal <[email protected]>
Signed-off-by: Mangaal <[email protected]>
Signed-off-by: Mangaal <[email protected]>
Signed-off-by: Mangaal <[email protected]>
|
@chetan-rns, Thanks for reviewing my PR. I’ve updated it and addressed your suggestions. Please take a look when you get a chance. |
chetan-rns
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Mangaal Sorry for the delay. Added a few questions around simplifying the agent logic. IMO, the agent should only propagate the options from the argocd server to the client-go's GetLogs(). Read the bytes from the reader until EOF, forward them back in chunks, and return any errors. We can avoid extracting timestamps to modify the sinceTime dynamically. I think we can rely on the argocd server to handle the chunks. This way the agent doesn't have to do any extra work. WDYT @jannfis
chetan-rns
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
WalkthroughAdds container log streaming: new ContainerLogRequest event and payload, agent-side static and resumable live log workflows with in‑flight dedupe, a client‑streaming gRPC LogStream API and server with HTTP writer registration, tests/mocks, e2e log retrieval helpers, and a backoff dependency. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client as HTTP Client
participant Principal as Principal (HTTP + LogStream gRPC)
participant Agent as Agent
participant K8s as Kubernetes API
Client->>Principal: GET /resources/{ns}/{pod}/log?requestUUID&follow
Principal->>Principal: RegisterHTTP(requestUUID) → create session & writer
Principal->>Agent: Emit ContainerLogRequest event (requestUUID, follow,...)
Agent->>Agent: startLogStreamIfNew(requestUUID) (dedupe)
alt Static (follow=false)
Agent->>K8s: PodLogs(follow=false, options)
K8s-->>Agent: log bytes, then EOF
Agent->>Principal: Stream gRPC chunks (data + EOF)
Principal->>Client: write bytes, flush, close
else Live (follow=true)
Agent->>K8s: PodLogs(follow=true, sinceTime/tail)
loop resume & retry (backoff)
K8s-->>Agent: incremental log bytes (with timestamps)
Agent->>Principal: Stream gRPC chunks
Principal->>Client: write & flush bytes
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (3)
go.sumis excluded by!**/*.sumpkg/api/grpc/logstreamapi/logstream.pb.gois excluded by!**/*.pb.gopkg/api/grpc/logstreamapi/logstream_grpc.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (15)
agent/agent.go(1 hunks)agent/inbound.go(1 hunks)agent/log.go(1 hunks)agent/log_test.go(1 hunks)go.mod(1 hunks)internal/event/event.go(5 hunks)principal/apis/logstream/logstream.go(1 hunks)principal/apis/logstream/logstream.proto(1 hunks)principal/apis/logstream/logstream_test.go(1 hunks)principal/apis/logstream/mock/mock.go(1 hunks)principal/listen.go(2 hunks)principal/resource.go(2 hunks)principal/server.go(3 hunks)test/e2e/fixture/argoclient.go(1 hunks)test/e2e/logs_test.go(1 hunks)
🧰 Additional context used
🪛 Buf (1.59.0)
principal/apis/logstream/logstream.proto
17-17: Files with package "logstreamapi" must be within a directory "logstreamapi" relative to root but were in directory "principal/apis/logstream".
(PACKAGE_DIRECTORY_MATCH)
Signed-off-by: Mangaal <[email protected]>
…og-streaming Signed-off-by: Mangaal <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
principal/apis/logstreamapi/logstream.proto (1)
17-20: Fix Buf PACKAGE_DIRECTORY_MATCH failure.
buf lintstill reportsPACKAGE_DIRECTORY_MATCHbecause the proto packagelogstreamapimust live directly under alogstreamapidirectory relative to the module root, while the current pathprincipal/apis/logstreamapistill violates that rule. Adjust either the package declaration (e.g.package principal.apis.logstreamapi;) or the directory/module configuration so the package path and filesystem layout align; otherwise, Buf-based lint/codegen will continue to fail. Based on static analysis hints
🧹 Nitpick comments (1)
principal/apis/logstreamapi/logstream.go (1)
289-293: Lower the per-chunk log level to avoid flooding principal logs.Writing an
Infoentry for every chunk sent to the HTTP client will spam logs during live streams and adds measurable overhead. Please drop this log or demote it toTrace, keeping high-volume traffic out of the default log level.- logCtx.WithFields(logrus.Fields{ - "data_length": len(data), - "request_id": reqID, - }).Info("HTTP write and flush successful") + logCtx.WithFields(logrus.Fields{ + "data_length": len(data), + "request_id": reqID, + }).Trace("HTTP write and flush successful")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (12)
agent/agent.go(2 hunks)agent/inbound.go(1 hunks)agent/log.go(1 hunks)agent/log_test.go(1 hunks)go.mod(1 hunks)internal/event/event.go(5 hunks)principal/apis/logstreamapi/logstream.go(1 hunks)principal/apis/logstreamapi/logstream.proto(1 hunks)principal/apis/logstreamapi/logstream_test.go(1 hunks)principal/apis/logstreamapi/mock/mock.go(1 hunks)principal/server.go(3 hunks)test/e2e/fixture/argoclient.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- go.mod
- agent/agent.go
🧰 Additional context used
🧬 Code graph analysis (6)
principal/apis/logstreamapi/logstream_test.go (2)
principal/apis/logstreamapi/logstream.go (1)
NewServer(66-71)principal/apis/logstreamapi/mock/mock.go (4)
NewMockHTTPResponseWriter(119-123)MockWriterWithoutFlusher(168-172)NewMockLogStreamServer(41-46)PanicFlusher(161-161)
principal/server.go (1)
principal/apis/logstreamapi/logstream.go (2)
Server(32-36)NewServer(66-71)
principal/apis/logstreamapi/logstream.go (2)
principal/server.go (2)
Server(71-164)NewServer(183-388)pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (1)
LogStreamService_StreamLogsServer(104-108)
agent/log_test.go (5)
principal/apis/logstreamapi/mock/mock.go (2)
MockLogStreamServer(30-39)NewMockLogStreamServer(41-46)pkg/api/grpc/logstreamapi/logstream.pb.go (6)
LogStreamData(24-37)LogStreamData(52-52)LogStreamData(67-69)LogStreamResponse(100-109)LogStreamResponse(124-124)LogStreamResponse(139-141)agent/agent.go (1)
Agent(62-116)test/fake/kube/kubernetes.go (1)
NewKubernetesFakeClientWithResources(69-78)internal/event/event.go (1)
ContainerLogRequest(959-973)
agent/inbound.go (1)
internal/event/event.go (1)
TargetContainerLog(85-85)
agent/log.go (4)
agent/agent.go (1)
Agent(62-116)internal/event/event.go (1)
ContainerLogRequest(959-973)pkg/api/grpc/logstreamapi/logstream.pb.go (3)
LogStreamData(24-37)LogStreamData(52-52)LogStreamData(67-69)pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (1)
LogStreamService_StreamLogsClient(46-50)
🪛 Buf (1.59.0)
principal/apis/logstreamapi/logstream.proto
17-17: Files with package "logstreamapi" must be within a directory "logstreamapi" relative to root but were in directory "principal/apis/logstreamapi".
(PACKAGE_DIRECTORY_MATCH)
Signed-off-by: Mangaal <[email protected]>
Signed-off-by: Mangaal <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
agent/log.go (2)
238-246: Consider propagating actual error details to the principal.Line 244 sends a generic error message
"log stream read failed"to the principal. Sending the actual error message (e.g.,err.Error()) would aid debugging on the principal/UI side, similar to how line 119 and line 294 already do this for other error paths.Apply this diff if you want more specific error messages:
if errors.Is(err, io.EOF) { _ = stream.Send(&logstreamapi.LogStreamData{RequestUuid: logReq.UUID, Eof: true}) return nil } logCtx.WithError(err).Error("Error reading log stream") - _ = stream.Send(&logstreamapi.LogStreamData{RequestUuid: logReq.UUID, Error: "log stream read failed"}) + _ = stream.Send(&logstreamapi.LogStreamData{RequestUuid: logReq.UUID, Error: err.Error()}) return err
268-272: Consider extracting the hardcoded overlap duration to a named constant.The
-100msoverlap at line 270 ensures no log lines are lost during resume, which is important. However, it's hardcoded and lacks explanation. Extracting it to a named constant would improve readability and make the intent clear.For example, add this constant near line 251:
const ( waitForReconnect = 10 * time.Second // how long we poll IsConnected() after Unauthenticated pollEvery = 1 * time.Second resumeOverlap = 100 * time.Millisecond // overlap to prevent losing log lines during resume )Then update line 270:
resumeReq := *logReq if lastTimestamp != nil { - t := lastTimestamp.Add(-100 * time.Millisecond) + t := lastTimestamp.Add(-resumeOverlap) resumeReq.SinceTime = t.Format(time.RFC3339) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
agent/log.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
agent/log.go (4)
agent/agent.go (1)
Agent(62-116)internal/event/event.go (1)
ContainerLogRequest(959-973)pkg/api/grpc/logstreamapi/logstream.pb.go (3)
LogStreamData(24-37)LogStreamData(52-52)LogStreamData(67-69)pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (2)
LogStreamService_StreamLogsClient(46-50)NewLogStreamServiceClient(33-35)
🔇 Additional comments (10)
agent/log.go (10)
17-34: LGTM!The imports are well-organized and all dependencies are appropriate for the log streaming functionality.
36-58: LGTM!Clean entry point with appropriate error handling and structured logging.
60-97: LGTM!The duplicate detection and lifecycle management logic is correct. The cleanup function properly invokes
cancel()and removes the inflight entry, addressing the critical issues from previous reviews.
148-165: LGTM!The goroutine correctly defers
cleanup()at line 156, ensuring the inflight entry is always released when the live stream ends. The panic handler only logs the panic, relying on the deferred cleanup for resource management—this is the correct pattern.
167-176: LGTM!Simple and correct helper for creating the gRPC log stream.
178-201: LGTM!The Kubernetes log stream configuration is correct. Setting
Timestampstotrueis essential for the resume capability, and theSinceTimeparsing properly supports retry logic.
282-286: Verify that the initial empty data message is necessary.The code sends an empty
LogStreamDatamessage at the start of each attempt for "health checks" (lines 282-286). While the comment explains this, it's worth confirming whether the principal actually requires this initial message or if it could be omitted to simplify the protocol.If the principal doesn't strictly require this empty message, consider removing it to simplify the streaming flow.
322-346: LGTM, but consider simplifying the reconnection wait loop if possible.The authentication failure handling is sophisticated and correct: it pauses exponential backoff and waits for reconnection by polling
IsConnected(). The manual ticker loop (lines 327-344) works correctly, though it could potentially be simplified. Given the complexity is warranted for reliable auth handling, this is acceptable as-is.
402-408: Verify EOF handling is intentional for live streaming.The code does NOT send an EOF message when
rc.Read()returnsio.EOF(lines 402-408), instead treating it as an error. In contrast,streamLogsToCompletion(line 239) handles EOF as clean termination. For live streaming withfollow=true, is it intentional to treat EOF as an error and retry?This might be related to the intermittent EOF errors reported in the PR comments. If the Kubernetes stream returns EOF prematurely (e.g., during reconnection), treating it as an error would trigger a retry with the resume logic, which seems correct. However, sending
err.Error()(which would be "EOF") to the principal as an error message might surface in the UI.Consider adding explicit EOF handling to distinguish between expected EOF (if any) and unexpected errors:
if err != nil { if errors.Is(err, io.EOF) { // For follow=true, EOF from k8s might indicate temporary disruption // Let the resume logic handle it logCtx.Info("EOF from k8s stream, will resume") return lastTimestamp, err } logCtx.WithError(err).Error("Error reading log stream") _ = stream.Send(&logstreamapi.LogStreamData{RequestUuid: logReq.UUID, Error: err.Error()}) return lastTimestamp, err }
412-438: LGTM!The timestamp extraction logic is well-optimized with appropriate guards and early exits. Trying both
RFC3339NanoandRFC3339formats ensures compatibility with Kubernetes timestamp variations.
Signed-off-by: Mangaal <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
agent/log.go (1)
99-146: Remove redundantdefer rc.Close()at line 123.The
rc.Close()is deferred at both line 123 and insidestreamLogsToCompletionat line 213, resulting in a double-close. While mostio.ReadCloserimplementations handle this gracefully, callingClose()twice is a code smell and not guaranteed safe across all implementations.Apply this diff to remove the redundant defer:
// Create Kubernetes log stream rc, err := a.createKubernetesLogStream(ctx, logReq) if err != nil { _ = stream.Send(&logstreamapi.LogStreamData{RequestUuid: logReq.UUID, Eof: true, Error: err.Error()}) _, _ = stream.CloseAndRecv() return err } - defer rc.Close() err = a.streamLogsToCompletion(ctx, stream, rc, logReq, logCtx)
🧹 Nitpick comments (2)
agent/log.go (2)
280-286: Clarify the purpose of the initial empty message.The comment "Used for health checks" may be misleading. This initial message appears to establish the stream connection rather than perform a health check. Consider clarifying the comment to accurately reflect its purpose (e.g., "Send initial message to establish stream connection").
383-392: Consider adding a comment to explain the timestamp extraction logic.The logic for extracting the timestamp from the last complete line in the buffer is correct but somewhat dense. A brief comment explaining why we search backwards for the last complete line (to enable resume from the most recent known timestamp) would improve maintainability.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
pkg/api/grpc/logstreamapi/logstream.pb.gois excluded by!**/*.pb.gopkg/api/grpc/logstreamapi/logstream_grpc.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (4)
agent/log.go(1 hunks)hack/generate-proto.sh(1 hunks)principal/apis/logstreamapi/logstream.proto(1 hunks)principal/server.go(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- principal/server.go
🧰 Additional context used
🧬 Code graph analysis (1)
agent/log.go (4)
agent/agent.go (1)
Agent(62-116)internal/event/event.go (1)
ContainerLogRequest(959-973)pkg/api/grpc/logstreamapi/logstream.pb.go (3)
LogStreamData(38-51)LogStreamData(66-66)LogStreamData(81-83)pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (1)
LogStreamService_StreamLogsClient(46-50)
🔇 Additional comments (7)
hack/generate-proto.sh (1)
26-26: Proto code generation target addition looks good.The new logstreamapi entry follows the established pattern and will be processed identically to existing API targets by the generation loop.
principal/apis/logstreamapi/logstream.proto (2)
17-19: Proto package-to-directory alignment appears to resolve prior Buf lint issue.The file location
principal/apis/logstreamapi/logstream.protonow correctly matches the declared packageprincipal.apis.logstreamapiand go_package option, satisfying Buf'sPACKAGE_DIRECTORY_MATCHrule that was flagged in the previous review.
22-44: Proto schema structure aligns with unidirectional client-streaming design.LogStreamData messages (with request_uuid correlation, data payload, eof marker, and error field) enable the agent to push log chunks to the principal, which the principal writes directly to HTTP response streams. The single LogStreamResponse returned after stream closure provides final status and statistics. The StreamLogs RPC's client-streaming semantics correctly implement the agent→principal push model described in the PR objectives.
agent/log.go (4)
60-97: LGTM: Cleanup and duplicate detection logic is correct.The in-flight tracking with cleanup function properly calls
cancel()before deleting the map entry, and the separation between static (with deferred cleanup) and live streaming (with cleanup passed to goroutine) correctly handles resource lifecycle for both paths.
148-165: LGTM: Live streaming goroutine properly defers cleanup.The goroutine correctly defers
cleanup()at the top, ensuring the in-flight entry and context cancellation are released when streaming ends, regardless of how it terminates (success, error, or panic).
250-360: LGTM: Resume logic with exponential backoff is well-designed.The error handling properly distinguishes between terminal errors (Canceled/NotFound), authentication errors requiring reconnection wait, and transient errors handled with exponential backoff. The 100ms timestamp overlap prevents log loss during resume.
411-437: LGTM: Timestamp extraction is robust and efficient.The function properly guards against edge cases (short lines, missing spaces, long tokens) and handles both RFC3339 and RFC3339Nano formats. The fast-path checks prevent unnecessary parsing attempts.
Signed-off-by: Mangaal <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
agent/log.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
agent/log.go (4)
agent/agent.go (1)
Agent(62-116)internal/event/event.go (1)
ContainerLogRequest(959-973)pkg/api/grpc/logstreamapi/logstream.pb.go (3)
LogStreamData(38-51)LogStreamData(66-66)LogStreamData(81-83)pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (1)
LogStreamService_StreamLogsClient(46-50)
Signed-off-by: Mangaal <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
agent/log.go(1 hunks)agent/log_test.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
agent/log_test.go (5)
principal/apis/logstreamapi/mock/mock.go (2)
MockLogStreamServer(30-39)NewMockLogStreamServer(41-46)pkg/api/grpc/logstreamapi/logstream.pb.go (6)
LogStreamData(38-51)LogStreamData(66-66)LogStreamData(81-83)LogStreamResponse(114-123)LogStreamResponse(138-138)LogStreamResponse(153-155)agent/agent.go (1)
Agent(62-116)test/fake/kube/kubernetes.go (1)
NewKubernetesFakeClientWithResources(69-78)internal/event/event.go (1)
ContainerLogRequest(959-973)
agent/log.go (4)
agent/agent.go (1)
Agent(62-116)internal/event/event.go (1)
ContainerLogRequest(959-973)pkg/api/grpc/logstreamapi/logstream.pb.go (3)
LogStreamData(38-51)LogStreamData(66-66)LogStreamData(81-83)pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (2)
LogStreamService_StreamLogsClient(46-50)NewLogStreamServiceClient(33-35)
agent/agent.go
Outdated
| clusterCache *appstatecache.Cache | ||
|
|
||
| inflightMu sync.Mutex | ||
| inflightLogs map[string]context.CancelFunc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two things:
- Can you add a description of what this field is used for
- AFAICT the value (cancel func) is never called. Is this intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added this to the block duplicate log request. I have added a comment.
Signed-off-by: Mangaal <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
agent/log_test.go (2)
83-93: Hardcoded UUID in mock response may cause test issues.The
CloseAndRecvmethod returns a hardcodedRequestUuidof"test-uuid", but test requests use"test-uuid-123"(line 156). While this doesn't break current tests that don't validate the response UUID, it could lead to confusion or false positives if future tests verify the response.Consider storing the request UUID in the mock and returning it:
type MockLogStreamClient struct { *mock.MockLogStreamServer sentData []*logstreamapi.LogStreamData sendFunc func(data *logstreamapi.LogStreamData) error + requestUuid string mu sync.RWMutex } -func NewMockLogStreamClient(ctx context.Context) *MockLogStreamClient { +func NewMockLogStreamClient(ctx context.Context, requestUuid string) *MockLogStreamClient { return &MockLogStreamClient{ MockLogStreamServer: mock.NewMockLogStreamServer(ctx), sentData: make([]*logstreamapi.LogStreamData, 0), + requestUuid: requestUuid, sendFunc: func(data *logstreamapi.LogStreamData) error { return nil }, } } func (m *MockLogStreamClient) CloseAndRecv() (*logstreamapi.LogStreamResponse, error) { // Simulate closing and receiving a response m.mu.RLock() linesReceived := len(m.sentData) m.mu.RUnlock() return &logstreamapi.LogStreamResponse{ - RequestUuid: "test-uuid", + RequestUuid: m.requestUuid, Status: 200, LinesReceived: int32(linesReceived), }, nil }
275-283: Test relies on panic which indicates incomplete setup.The test expects a panic when processing a new request, which suggests missing dependencies in the test agent. This makes the test fragile and doesn't properly validate the happy path.
Consider providing a complete mock setup so the test can verify the actual behavior without relying on panics:
t.Run("new request", func(t *testing.T) { logReq := createTestLogRequest(false) - agent := createTestAgent() - // This will panic due to missing dependencies, but we can check if new request is processed - assert.Panics(t, func() { - agent.startLogStreamIfNew(logReq, logCtx) - }) - + agent := createTestAgentWithKubeClient() + // Provide mock remote connection, or test the specific behavior you want to validate + // rather than relying on panic + err := agent.startLogStreamIfNew(logReq, logCtx) + // Add appropriate assertions based on expected behavior })
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (6)
agent/agent.go(2 hunks)agent/inbound.go(1 hunks)agent/log.go(1 hunks)agent/log_test.go(1 hunks)go.mod(1 hunks)principal/server.go(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- go.mod
- agent/agent.go
- principal/server.go
🧰 Additional context used
🧬 Code graph analysis (3)
agent/inbound.go (1)
internal/event/event.go (1)
TargetContainerLog(85-85)
agent/log.go (5)
agent/agent.go (1)
Agent(62-117)internal/event/event.go (1)
ContainerLogRequest(959-973)internal/logging/logfields/logfields.go (1)
UUID(30-30)pkg/api/grpc/logstreamapi/logstream.pb.go (3)
LogStreamData(38-51)LogStreamData(66-66)LogStreamData(81-83)pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (1)
LogStreamService_StreamLogsClient(46-50)
agent/log_test.go (7)
principal/apis/logstreamapi/mock/mock.go (2)
MockLogStreamServer(30-39)NewMockLogStreamServer(41-46)pkg/api/grpc/logstreamapi/logstream.pb.go (6)
LogStreamData(38-51)LogStreamData(66-66)LogStreamData(81-83)LogStreamResponse(114-123)LogStreamResponse(138-138)LogStreamResponse(153-155)internal/logging/logfields/logfields.go (3)
Status(130-130)UUID(30-30)Name(59-59)agent/agent.go (1)
Agent(62-117)test/fake/kube/kubernetes.go (1)
NewKubernetesFakeClientWithResources(69-78)internal/event/event.go (2)
ContainerLogRequest(959-973)Create(56-56)principal/apis/logstreamapi/logstream_test.go (1)
TestStreamLogs(100-216)
🔇 Additional comments (13)
agent/inbound.go (1)
77-78: LGTM! Container log routing added correctly.The new event routing for
TargetContainerLogfollows the established pattern and integrates cleanly with the existing switch statement.agent/log_test.go (4)
114-127: LGTM! Simple and effective test helper.The
MockReadCloserimplementation is clean and provides useful test functionality.
166-210: LGTM! Comprehensive timestamp extraction tests.The test cases cover the important timestamp formats and edge cases.
287-383: LGTM! Comprehensive streaming tests.The tests effectively validate both successful streaming and error conditions, including cancellation and send failures.
249-256: Verify test expectation for non-existent pod.The test expects
NoErrorwhen requesting logs for a non-existent pod. This behavior should be verified against the actualcreateKubernetesLogStreamimplementation and the fake Kubernetes client configuration to determine if the assertion is correct or if an error should be expected.agent/log.go (8)
36-58: LGTM! Clean entry point with proper context.The log request processing entry point is well-structured with appropriate logging context and error handling.
60-91: LGTM! Duplicate detection and cleanup properly implemented.The inflight log management correctly handles both static and live streaming cases, with appropriate cleanup for cancellation and map entry removal.
93-132: LGTM! Static log handling with appropriate error management.The static log handling correctly manages stream creation, error handling, and authentication failures.
134-148: LGTM! Live streaming with proper cleanup and early ACK.The live streaming goroutine correctly defers cleanup to ensure resources are released when the stream ends, and returns early ACK to prevent timeout.
150-181: LGTM! Stream creation with proper option handling.Both gRPC and Kubernetes stream creation are correctly implemented with appropriate option transfer and error handling.
183-231: LGTM! Complete static streaming with proper resource management.The streaming implementation correctly handles EOF as normal termination, closes the ReadCloser via defer, and checks for context cancellation before blocking reads.
233-333: LGTM! Robust resume logic with appropriate backoff strategy.The resume implementation correctly handles different error scenarios: intentional stops, authentication failures with reconnection wait, and transient errors with exponential backoff.
335-418: LGTM! Streaming with timestamp extraction and proper resource cleanup.The streaming implementation correctly closes the ReadCloser, treats EOF as successful termination, and extracts timestamps from log lines for resume capability. The timestamp extraction handles common RFC3339 variants efficiently.
Signed-off-by: Mangaal <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
agent/log_test.go (4)
83-93: Hardcoded UUID in CloseAndRecv reduces test fidelity.The
RequestUuidfield is hardcoded to"test-uuid"instead of using the actual UUID from the sent data. This could mask bugs where the wrong UUID is used.Apply this diff to derive the UUID from sent data:
func (m *MockLogStreamClient) CloseAndRecv() (*logstreamapi.LogStreamResponse, error) { // Simulate closing and receiving a response m.mu.RLock() + requestUUID := "test-uuid" + if len(m.sentData) > 0 { + requestUUID = m.sentData[0].RequestUuid + } linesReceived := len(m.sentData) m.mu.RUnlock() return &logstreamapi.LogStreamResponse{ - RequestUuid: "test-uuid", + RequestUuid: requestUUID, Status: 200, LinesReceived: int32(linesReceived), }, nil }
273-280: Consider testing actual behavior instead of panic.The test verifies that
startLogStreamIfNewpanics when dependencies are missing. This tests an implementation detail rather than behavior. Consider injecting minimal dependencies (e.g., a mock stream client) to test that the function actually initiates log streaming for new requests.
294-307: Clarify the assertion expectation.The comment states "At least 2 data messages + EOF" but the assertion
GreaterOrEqual(t, len(sentData), 2)is ambiguous—it's unclear whether the count includes or excludes the EOF message. Consider either being more explicit in the comment or adding a separate assertion for the EOF message.Apply this diff to make the expectation clearer:
// Verify that data was sent sentData := mockStream.GetSentData() - assert.GreaterOrEqual(t, len(sentData), 2) // At least 2 data messages + EOF + require.GreaterOrEqual(t, len(sentData), 1, "Expected at least one message") // Check that the last message is EOF lastMessage := sentData[len(sentData)-1] assert.True(t, lastMessage.Eof)
328-355: Misleading assertion message.Line 350 states
"Should end with EOF, not timeout", but whenstreamErris nil, it means the stream completed successfully, not that it encountered an EOF error. The message is confusing because EOF is a normal termination condition, not an error.Apply this diff to clarify:
- assert.Nil(t, streamErr, "Should end with EOF, not timeout") + assert.Nil(t, streamErr, "Stream should complete successfully")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
agent/log_test.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
agent/log_test.go (6)
principal/apis/logstreamapi/mock/mock.go (2)
MockLogStreamServer(30-39)NewMockLogStreamServer(41-46)pkg/api/grpc/logstreamapi/logstream.pb.go (6)
LogStreamData(38-51)LogStreamData(66-66)LogStreamData(81-83)LogStreamResponse(114-123)LogStreamResponse(138-138)LogStreamResponse(153-155)agent/agent.go (1)
Agent(62-117)test/fake/kube/kubernetes.go (1)
NewKubernetesFakeClientWithResources(69-78)internal/event/event.go (2)
ContainerLogRequest(959-973)Create(56-56)principal/apis/logstreamapi/logstream_test.go (1)
TestStreamLogs(100-216)
🔇 Additional comments (5)
agent/log_test.go (5)
114-128: LGTM!Clean test double with proper close tracking for verification.
129-163: LGTM!Test helpers properly initialize required fields for agent and log request testing.
165-210: LGTM!Comprehensive timestamp extraction test covering multiple RFC3339 variants and edge cases.
356-379: LGTM!Excellent test with error injection via
SetSendFunc. This verifies that both the timestamp and error are properly returned when send fails, and confirms that at least one send attempt was made.
249-256: Verify the NoError assertion for non-existent pod requests.The test asserts
NoErrorwhen requesting logs for a non-existent pod. Typically, such requests should fail. Confirm whether this behavior is intentional—either the fake Kubernetes client returns no error for missing pods, or the assertion should expect an error instead.
Signed-off-by: Mangaal <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (6)
agent/log_test.go (6)
118-131: MockReadCloser is fine; only add sync if used across goroutines
MockReadCloseris a simple and useful test double. If you ever start assertingIsClosed()from a different goroutine than the one callingClose(), consider wrappingclosedwith a mutex oratomic.Boolto avoid races under the-racedetector. As-is, for single-goroutine use, it’s perfectly adequate.
133-167: Agent test helpers are clear and keep setup DRY
createTestAgent/createTestAgentWithKubeClient/createTestLogRequestnicely encapsulate common setup and keep the tests focused on behavior rather than wiring.One optional improvement: accept
*testing.Tand callt.Helper()plust.Cleanup(cancel)so the contexts are always canceled even if a test fails early.
216-259: Strengthen non-existent pod expectations inTestCreateKubernetesLogStreamThe “non-existent pod” subtest only checks that no error is returned; it doesn’t confirm that no stream is opened. If
createKubernetesLogStreamever started returning a non-nilReadCloserfor 404s, this test wouldn’t catch it.- t.Run("Test createKubernetesLogStream with non-existent pod", func(t *testing.T) { + t.Run("Test createKubernetesLogStream with non-existent pod", func(t *testing.T) { agent := createTestAgentWithKubeClient() // Test with a non-existent pod logReqNotFound := createTestLogRequest(false) logReqNotFound.PodName = "non-existent-pod" - _, err := agent.createKubernetesLogStream(ctx, logReqNotFound) - assert.NoError(t, err) + rc, err := agent.createKubernetesLogStream(ctx, logReqNotFound) + assert.NoError(t, err) + assert.Nil(t, rc, "expected no log stream for non-existent pod") })
261-282: Avoid relying on a panic to assert “new request” behaviorIn the “new request” subtest you intentionally expect
startLogStreamIfNewto panic due to missing dependencies. That makes the test brittle: once you wire in the remaining dependencies or refactor the function to fail more gracefully, the test will start failing even though behavior is improved.A more robust pattern is:
- Construct an
Agentwith the minimal mocks sostartLogStreamIfNewcan run without panicking.- Assert that the request UUID is added to
inflightLogsand that no error is returned (or that a specific, well-defined error is returned).
284-314: Use predicate-style error assertion for context cancellationThe cancellation subtest currently asserts
assert.Equal(t, context.Canceled, err). This ties the test to an exact error value; ifstreamLogsToCompletionever wraps the context error (while still preservingcontext.Canceledas the cause), the behavior would be acceptable but the test would fail.Switching to a predicate-style assertion will keep the test robust to error wrapping while still ensuring you surface cancellations correctly.
- err := agent.streamLogsToCompletion(cancelCtx, mockStream, reader, logReq, logCtx) - assert.Error(t, err) - assert.Equal(t, context.Canceled, err) + err := agent.streamLogsToCompletion(cancelCtx, mockStream, reader, logReq, logCtx) + assert.Error(t, err) + // allow for potential error wrapping while still enforcing cancellation semantics + assert.ErrorIs(t, err, context.Canceled)
316-362: Isolate fixtures per subtest and relax overly strict data expectations inTestStreamLogsTwo small test-quality nits here:
agentandlogReqare shared across subtests. IfstreamLogsmutates agent state or the request (e.g., settingSinceTime), the second subtest may observe side effects from the first. Creating fresh fixtures pert.Runkeeps tests independent.In the “send failure” subtest,
assert.Equal(t, testData, string(sentData[0].Data))couples the test tightly to the current flush/chunking strategy (single send with the full buffer). If you later switch to line-based or size-based chunking, the behavior would still be semantically correct but this assertion would fail.You can address both with a small refactor:
-func TestStreamLogs(t *testing.T) { - agent := createTestAgentWithKubeClient() - ctx := context.Background() - logReq := createTestLogRequest(true) - logCtx := logrus.NewEntry(logrus.New()) +func TestStreamLogs(t *testing.T) { + ctx := context.Background() + logCtx := logrus.NewEntry(logrus.New()) @@ - t.Run("successful streaming with data verification", func(t *testing.T) { + t.Run("successful streaming with data verification", func(t *testing.T) { + agent := createTestAgentWithKubeClient() + logReq := createTestLogRequest(true) @@ - t.Run("send failure returns timestamp and error", func(t *testing.T) { + t.Run("send failure returns timestamp and error", func(t *testing.T) { + agent := createTestAgentWithKubeClient() + logReq := createTestLogRequest(true) @@ - sentData := mockStream.GetSentData() - require.Greater(t, len(sentData), 0, "expected a send attempt before failure") - assert.Equal(t, testData, string(sentData[0].Data)) + sentData := mockStream.GetSentData() + require.Greater(t, len(sentData), 0, "expected a send attempt before failure") + // Loosen expectation to avoid coupling to exact chunking strategy + assert.Contains(t, string(sentData[0].Data), "line 1", "expected at least the first log line to be sent before failure")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
agent/log_test.go(1 hunks)
🔇 Additional comments (2)
agent/log_test.go (2)
169-214:TestExtractTimestampcovers key formats and edge cases wellThe table-driven tests exercise nanoseconds, milliseconds, plain seconds, missing timestamps, and a tab separator, which gives good confidence in
extractTimestamp. ThetimePtrhelper keeps expectations readable.No issues here from my side.
364-367:timePtrhelper is simple and appropriateTiny helper, but it keeps expected timestamps readable in the tests above. No changes needed here.
Signed-off-by: Mangaal <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
agent/log_test.go (2)
117-119: Optional: Add mutex protection to SetSendFunc.While typical test usage sets this during setup before concurrent access, adding mutex protection would make the mock more robust against potential race conditions with
Send(line 63).func (m *MockLogStreamClient) SetSendFunc(fn func(data *logstreamapi.LogStreamData) error) { + m.mu.Lock() + defer m.mu.Unlock() m.sendFunc = fn }
277-284: Optional: Consider testing with proper setup instead of expecting panic.Testing for panics typically indicates missing setup rather than validating intended behavior. Consider either:
- Properly mocking/setting up the required dependencies (e.g., gRPC stream, logger)
- Removing this test if it's not adding meaningful coverage
If the panic is intentional behavior for missing dependencies, consider documenting why this is the expected design.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
agent/log_test.go(1 hunks)
🔇 Additional comments (12)
agent/log_test.go (12)
14-36: LGTM!The package declaration and imports are well-organized and appropriate for the test scenarios covered in this file.
38-56: LGTM!The mock client structure and constructor are well-designed for testing purposes, with sensible defaults and proper initialization.
86-100: LGTM! Previous panic concern resolved.The CloseAndRecv implementation now safely handles the case when no data has been sent by defaulting to
m.requestIDand only accessingm.sentData[0]after verifying the slice is non-empty.
121-134: LGTM!The MockReadCloser provides a clean, minimal implementation for testing with a helpful
IsClosed()method for verification.
137-170: LGTM!The test helper functions provide clean, reusable fixtures for setting up test scenarios.
172-217: LGTM!The test provides comprehensive coverage of various timestamp formats and edge cases using a clean table-driven approach.
224-253: LGTM!The test correctly sets up a pod in the fake Kubernetes client and verifies that log stream creation succeeds with proper cleanup.
267-276: LGTM!The test correctly verifies that duplicate requests are handled gracefully by returning early without error.
287-317: LGTM!Both test cases properly verify the streaming behavior: successful completion with EOF marker and proper context cancellation handling.
319-365: LGTM!Both test cases thoroughly validate the streaming behavior:
- Successful streaming with timestamp extraction
- Proper error handling while preserving the last captured timestamp
The use of
ErrorIsand verification of sent data demonstrates good testing practices.
367-370: LGTM!Simple, clean helper function for creating time pointers in test assertions.
254-261: Verify expected behavior for non-existent pod.The test expects
NoErrorwhen requesting logs for a non-existent pod. This requires verification of whethercreateKubernetesLogStreamintentionally handles missing pods gracefully, or if the fake Kubernetes client is masking expected errors that would occur in a real cluster. Confirm the intended behavior and adjust the test assertion accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (1)
go.mod(1 hunks)
This PR introduces a unidirectional(agent → principal) log streaming service and wires it into the resource-proxy path so the Principal can serve Kubernetes pod logs to the Argo CD UI. The Agent handles both static logs (follow=false) and live streaming (follow=true) with resume support.
What’s included:
Key feature:
Assisted-by: Cursor/Gemini etc
logs.mov
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.