-
Notifications
You must be signed in to change notification settings - Fork 118
Improve apps logs streaming helpers #3908
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
pkosiec
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @ericfeunekes for your contribution! The new app logs command is super helpful. I tested the changes and it works well 👍
Before merge, please take a look at my comments. I can help you implementing the changes if you'd like to - feel free to ping me anytime. Thanks!
cmd/auth/token.go
Outdated
| OAuthArgument: oauthArgument, | ||
| Host: args.authArguments.Host, | ||
| AccountID: args.authArguments.AccountID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we already pass args.authArguments (which consists of Host and AccountID properties), perhaps it makes sense to:
- pass the whole struct
args.authArgumentsas an input argument - do
oauthArgument, err := args.authArguments.ToOAuthArgument()inside theauth.AcquireToken()
In that way we'll reduce 3 input args to just one. Does it make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auth.AcquireTokenRequest now carries the entire auth.AuthArguments struct and does the ToOAuthArgument conversion internally, so CLI callers don’t need to juggle host/account IDs or duplicate that logic. cmd/auth token and the new apps logs command just pass the struct plus profile/timeout metadata.
cmd/auth/token_test.go
Outdated
| } | ||
| } | ||
|
|
||
| func TestLoadTokenDoesNotMutatePersistentAuthOpts(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understood correctly, this test verifies those 3 lines:
allOpts := append([]u2m.PersistentAuthOption{}, req.PersistentAuthOpts...)
allOpts = append(allOpts, u2m.WithOAuthArgument(req.OAuthArgument))
persistentAuth, err := u2m.NewPersistentAuth(ctx, allOpts...)Is that correct? So it effectively tests the AcquireToken function, and just that specific part. I'm not really sure if there's enough value in such separate unit test 🤔 I'll ask for a second opinion, but for now we can keep it, however let's move it to libs/auth/token_loader_test.go 👍
| validateToken: validateToken, | ||
| }, | ||
| } | ||
| for _, c := range cases { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we move the loadToken tests to libs/auth/token_loader_test.go, and keep just a single happy path for the loadToken itself? Now the core logic is in the AcquireToken and basically that's what the loadToken unit tests test.
docs/logstream_SKILL.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is the best place to have this skill definition - what do you think about moving it to .claude/skills/logstream.md?
Also, do you have any use case for the CLI that could use your skill? I'm wondering if we need it at all 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment above was missed, please move this file to .claude/skills/logstream.md 🙏
cmd/workspace/apps/logs.go
Outdated
| oauthArg, err := authArgs.ToOAuthArgument() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(As noted in the first comment, once it is applied, this would be moved to the auth.AcquireTokenRequest)
libs/logstream/streamer.go
Outdated
| var closeErr *websocket.CloseError | ||
| if errors.As(err, &closeErr) { | ||
| switch closeErr.Code { | ||
| case 4401, 4403: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you define consts for the error codes please?
libs/logstream/streamer.go
Outdated
| if err := waitForBackoff(ctx, timer, backoff); err != nil { | ||
| return err | ||
| } | ||
| backoff = min(backoff*2, 5*time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add 5*time.Second to consts 👍
| userAgent string | ||
| } | ||
|
|
||
| func (s *logStreamer) Run(ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we split the Run and consume methods to increase readability please? 🙏
libs/logstream/streamer.go
Outdated
|
|
||
| backoff = time.Second | ||
| err = s.consume(ctx, conn) | ||
| _ = conn.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The refactoring (splitting the method to multiple ones) would be helpful as then we could follow the best practices and add defer conn.Close() right after a successful connection:
conn, resp, err := s.dialer.DialContext(ctx, s.url, headers)
if err != nil {
// err handling
}
defer conn.Close() // <-- hereand also, within the same function, we could move the goroutine:
go func() {
select {
case <-ctx.Done():
_ = conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "context canceled"), time.Now().Add(time.Second))
_ = conn.Close()
case <-closeCh:
}
}()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, currently the connection is closed two times and in case of timeouted context, second call would return an error (connection already closed). Could we use sync.Once to solve it?
| userAgent string | ||
| } | ||
|
|
||
| func (s *logStreamer) Run(ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add a comment that this method isn't thread-safe, just to make sure no one runs a single logStreamer instance concurrently 👍
This reverts commit 402e72743d9a9bd6dbdc7c24f1f20d62cededf11.
|
@pkosiec I think I've addressed each of your issues and tried to keep them on separate commits to make it easier to review |
pkosiec
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
libs/auth/token_loader_test.go
Outdated
| func TestAcquireTokenRefreshFailureWithProfileShowsLoginHint(t *testing.T) { | ||
| _, err := AcquireToken(context.Background(), AcquireTokenRequest{ | ||
| AuthArguments: &AuthArguments{ | ||
| Host: "https://accounts.cloud.databricks.com", | ||
| AccountID: "expired", | ||
| }, | ||
| ProfileName: "expired", | ||
| PersistentAuthOpts: []u2m.PersistentAuthOption{ | ||
| u2m.WithTokenCache(newTokenCache()), | ||
| u2m.WithOAuthEndpointSupplier(&mockOAuthEndpointSupplier{}), | ||
| u2m.WithHttpClient(&http.Client{Transport: fixtures.SliceTransport{refreshFailureTokenResponse}}), | ||
| }, | ||
| }) | ||
| require.EqualError(t, err, `A new access token could not be retrieved because the refresh token is invalid. To reauthenticate, run the following command: | ||
| $ databricks auth login --profile expired`) | ||
| } | ||
|
|
||
| func TestAcquireTokenRefreshFailureWithHostShowsLoginHint(t *testing.T) { | ||
| _, err := AcquireToken(context.Background(), AcquireTokenRequest{ | ||
| AuthArguments: &AuthArguments{ | ||
| Host: "https://accounts.cloud.databricks.com", | ||
| AccountID: "expired", | ||
| }, | ||
| PersistentAuthOpts: []u2m.PersistentAuthOption{ | ||
| u2m.WithTokenCache(newTokenCache()), | ||
| u2m.WithOAuthEndpointSupplier(&mockOAuthEndpointSupplier{}), | ||
| u2m.WithHttpClient(&http.Client{Transport: fixtures.SliceTransport{refreshFailureTokenResponse}}), | ||
| }, | ||
| }) | ||
| require.EqualError(t, err, `A new access token could not be retrieved because the refresh token is invalid. To reauthenticate, run the following command: | ||
| $ databricks auth login --host https://accounts.cloud.databricks.com --account-id expired`) | ||
| } | ||
|
|
||
| func TestAcquireTokenInvalidRefreshResponseShowsHelp(t *testing.T) { | ||
| _, err := AcquireToken(context.Background(), AcquireTokenRequest{ | ||
| AuthArguments: &AuthArguments{ | ||
| Host: "https://accounts.cloud.databricks.com", | ||
| AccountID: "active", | ||
| }, | ||
| ProfileName: "active", | ||
| PersistentAuthOpts: []u2m.PersistentAuthOption{ | ||
| u2m.WithTokenCache(newTokenCache()), | ||
| u2m.WithOAuthEndpointSupplier(&mockOAuthEndpointSupplier{}), | ||
| u2m.WithHttpClient(&http.Client{Transport: fixtures.SliceTransport{refreshFailureInvalidResponse}}), | ||
| }, | ||
| }) | ||
| require.EqualError(t, err, "token refresh: oauth2: cannot parse json: invalid character 'N' looking for beginning of value. Try logging in again with `databricks auth login --profile active` before retrying. If this fails, please report this issue to the Databricks CLI maintainers at https://github.com/databricks/cli/issues/new") | ||
| } | ||
|
|
||
| func TestAcquireTokenOtherErrorShowsHelp(t *testing.T) { | ||
| _, err := AcquireToken(context.Background(), AcquireTokenRequest{ | ||
| AuthArguments: &AuthArguments{ | ||
| Host: "https://accounts.cloud.databricks.com", | ||
| AccountID: "active", | ||
| }, | ||
| ProfileName: "active", | ||
| PersistentAuthOpts: []u2m.PersistentAuthOption{ | ||
| u2m.WithTokenCache(newTokenCache()), | ||
| u2m.WithOAuthEndpointSupplier(&mockOAuthEndpointSupplier{}), | ||
| u2m.WithHttpClient(&http.Client{Transport: fixtures.SliceTransport{refreshFailureOtherError}}), | ||
| }, | ||
| }) | ||
| require.EqualError(t, err, "token refresh: Databricks is down (error code: other_error). Try logging in again with `databricks auth login --profile active` before retrying. If this fails, please report this issue to the Databricks CLI maintainers at https://github.com/databricks/cli/issues/new") | ||
| } | ||
|
|
||
| func TestAcquireTokenSuccessReturnsAccessToken(t *testing.T) { | ||
| token, err := AcquireToken(context.Background(), AcquireTokenRequest{ | ||
| AuthArguments: &AuthArguments{ | ||
| Host: "https://accounts.cloud.databricks.com", | ||
| AccountID: "active", | ||
| }, | ||
| ProfileName: "active", | ||
| PersistentAuthOpts: []u2m.PersistentAuthOption{ | ||
| u2m.WithTokenCache(newTokenCache()), | ||
| u2m.WithOAuthEndpointSupplier(&mockOAuthEndpointSupplier{}), | ||
| u2m.WithHttpClient(&http.Client{Transport: fixtures.SliceTransport{refreshSuccessTokenResponse}}), | ||
| }, | ||
| }) | ||
| require.NoError(t, err) | ||
| assert.Equal(t, "new-access-token", token.AccessToken) | ||
| assert.Equal(t, "Bearer", token.TokenType) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those are moved tests from the token_test.go, correct? Could you please keep them defined as table tests? It's much easier to maintain such cases in table test format. Thanks!
libs/auth/token_loader.go
Outdated
| var persistentAuthFactory = func(ctx context.Context, opts ...u2m.PersistentAuthOption) (persistentAuth, error) { | ||
| return u2m.NewPersistentAuth(ctx, opts...) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, this is defined purely for a single unit test that checks if the PersistentAuthOpts aren't mutated, correct?
I'd get rid of this as we don't need to do that. Please check my next comment for more details 👍
libs/auth/token_loader_test.go
Outdated
| defaultFactory := persistentAuthFactory | ||
| t.Cleanup(func() { | ||
| persistentAuthFactory = defaultFactory | ||
| }) | ||
|
|
||
| opts := []u2m.PersistentAuthOption{ | ||
| func(pa *u2m.PersistentAuth) {}, | ||
| func(pa *u2m.PersistentAuth) {}, | ||
| } | ||
| initialLen := len(opts) | ||
| factoryCalls := 0 | ||
|
|
||
| persistentAuthFactory = func(ctx context.Context, providedOpts ...u2m.PersistentAuthOption) (persistentAuth, error) { | ||
| factoryCalls++ | ||
| require.Len(t, providedOpts, initialLen+1) | ||
| require.Len(t, opts, initialLen) // original slice must not change | ||
| return &fakePersistentAuth{ | ||
| token: &oauth2.Token{ | ||
| AccessToken: "token", | ||
| }, | ||
| }, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just skip that part, and use the mockOAuthEndpointSupplier, and then check the length of the opts, without counting the calls etc. 👍 No need for it IMO.
docs/logstream_SKILL.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment above was missed, please move this file to .claude/skills/logstream.md 🙏
libs/logstream/streamer.go
Outdated
| const handshakeErrorBodyLimit = 4 * 1024 | ||
| const defaultUserAgent = "databricks-cli logstream" | ||
| const initialReconnectBackoff = 200 * time.Millisecond | ||
| const maxReconnectBackoff = 5 * time.Second | ||
| const closeCodeUnauthorized = 4401 | ||
| const closeCodeForbidden = 4403 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| const handshakeErrorBodyLimit = 4 * 1024 | |
| const defaultUserAgent = "databricks-cli logstream" | |
| const initialReconnectBackoff = 200 * time.Millisecond | |
| const maxReconnectBackoff = 5 * time.Second | |
| const closeCodeUnauthorized = 4401 | |
| const closeCodeForbidden = 4403 | |
| const ( | |
| handshakeErrorBodyLimit = 4 * 1024 | |
| defaultUserAgent = "databricks-cli logstream" | |
| initialReconnectBackoff = 200 * time.Millisecond | |
| maxReconnectBackoff = 5 * time.Second | |
| closeCodeUnauthorized = 4401 | |
| closeCodeForbidden = 4403 | |
| ) |
libs/logstream/streamer.go
Outdated
| } | ||
|
|
||
| backoff := initialReconnectBackoff | ||
| timer := time.NewTimer(time.Hour) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We stop the timer immediately in the next line - shouldn't we have just var timer time.Timer?
libs/logstream/streamer.go
Outdated
|
|
||
| stopWatch := watchContext(ctx, conn) | ||
| defer stopWatch() | ||
| defer conn.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I understand correctly that we don't need to do that as the stopWatch will close the connection? If so, then we can remove that line (that was my original intention of one of the previous comments - ensure the connection is closed only once)
| return line | ||
| } | ||
|
|
||
| type tailBuffer struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit (for later): Later (in a follow-up PR) we could move tailBuffer, formatting-related helpers to separate files. But let's do that later 👍
libs/logstream/streamer.go
Outdated
| func stopTimer(timer *time.Timer) { | ||
| if timer == nil { | ||
| return | ||
| } | ||
| if !timer.Stop() { | ||
| drainTimer(timer) | ||
| } | ||
| } | ||
|
|
||
| func resetTimer(timer *time.Timer, d time.Duration) { | ||
| if timer == nil { | ||
| return | ||
| } | ||
| if !timer.Stop() { | ||
| drainTimer(timer) | ||
| } | ||
| timer.Reset(d) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
those functions look super similar, can't we have just one, stopTimer? resetTimer is called only once and it could use stopTimer with another line timer.Reset(d).
cmd/workspace/apps/apps.go
Outdated
| cmd.AddCommand(newStop()) | ||
| cmd.AddCommand(newUpdate()) | ||
| cmd.AddCommand(newUpdatePermissions()) | ||
| cmd.AddCommand(newLogsCommand()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies, I know I put a comment to add the command as a part of the file but now I realized it is generated from API files in one of our internal repositories.
Could we please revert the changes in ffe6e3d? 🙏 I can do that, if you'd like to 👍 Thank you!
(For a later follow-up PR)
@fjakobs @pietern The /logz and /logz/stream endpoints don't seem to be part of the public API. Should we keep it as it is (after the revert of the mentioned commit) or should I include the endpoints in our proto files in the universe repo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can move this into overrides.go in the same directory.
See cmdOverrides.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pkosiec The endpoints are not public APIs at the platform level, they are specific to the apps runtime.
That's why they are not documented in the platform level API docs.
pietern
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ericfeunekes, thanks for the PR!
The token changes (cmd/auth/token_* and libs/auth/token_*) are not necessary. You can use the TokenSource function on the SDK configuration directly to get a fresh OAuth token. I confirmed that the following patch works: https://gist.github.com/pietern/41d02acc2907416244789c7408fb232f
Please move logstream to libs/apps to make it clearer that it is specific to apps.
cmd/workspace/apps/apps.go
Outdated
| cmd.AddCommand(newStop()) | ||
| cmd.AddCommand(newUpdate()) | ||
| cmd.AddCommand(newUpdatePermissions()) | ||
| cmd.AddCommand(newLogsCommand()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can move this into overrides.go in the same directory.
See cmdOverrides.
|
Btw, when I stop an app during tail, the logs command doesn't stop or error. |
|
@ericfeunekes I forgot an important detail in the previous message. Contributing to CLI requires a signed CLA, if that's something you're willing to do, please reach out with a request to sign CLA to [email protected] and we will take it from there. Thanks! |
Replace custom token acquisition logic with SDK's built-in TokenSource function as suggested by @pietern. This simplifies the implementation by delegating token lifecycle management to the SDK configuration. Changes: - Use cfg.GetTokenSource() instead of auth.AcquireToken() - Remove custom token_loader abstraction (libs/auth/token_loader.go) - Revert cmd/auth/token.go to original implementation - Remove tokenAcquireTimeout constant (handled by SDK) This addresses review feedback from @pietern on Nov 17, 2025. Reference: https://gist.github.com/pietern/41d02acc2907416244789c7408fb232f 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Relocate logstream from libs/logstream to libs/apps/logstream to make it clearer that this package is specific to Databricks Apps functionality. Changes: - Move libs/logstream/streamer.go to libs/apps/logstream/streamer.go - Move libs/logstream/streamer_test.go to libs/apps/logstream/streamer_test.go - Update import in cmd/workspace/apps/logs.go This addresses review feedback from @pietern on Nov 17, 2025. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Group individual const declarations into a single const block for better code organization and readability. Changes: - Convert six individual const declarations to const block - Apply alignment formatting This addresses review feedback from @pkosiec on Nov 14, 2025. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
When using --tail-lines with -f (follow), the buffer was flushing too early, showing all historical logs instead of just the last N lines. The tail buffer maintains a rolling window of the last N lines, but it was flushing as soon as it reached N lines when following, rather than waiting for the prefetch window to collect all historical logs first. Fix: Always respect the flush deadline (prefetch window) regardless of follow mode. This allows historical logs to stream in and the rolling buffer to maintain only the truly last N lines before flushing. Example with --tail-lines 10 -f: - Before: Shows lines 1-10, then 11-1000+ (all logs) - After: Shows lines 991-1000, then new logs (correct) This addresses review feedback from @pkosiec on Nov 14, 2025. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Replace magic number (30 seconds) with defaultHandshakeTimeout constant for better code maintainability. Changes: - Add defaultHandshakeTimeout = 30 * time.Second to const block - Use constant in newLogStreamDialer instead of inline value This addresses review feedback from @pkosiec on Nov 12/14, 2025. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
When following app logs (--follow) and the app stops or is deleted, the logs command now detects this and exits gracefully instead of retrying indefinitely. Implementation: - Added AppStatusChecker callback to logstream.Config - Check app status before each reconnect attempt in follow mode - Also check when connection closes normally during follow - Exit with clear error message when app is stopped/deleting/error state This addresses @pietern's comment about the command not stopping when the app is stopped during tail. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
|
An authorized user can trigger integration tests manually by following the instructions below: Trigger: Inputs:
Checks will be approved automatically on success. |

Changes
databricks apps logs NAMEcommand, including tail/follow/search/source/output-file flags wired viacmdgroup, file mirroring with 0600 perms, and validation against apps that lack a public URL. (cmd/workspace/apps)libs/logstreamhelper with token refresh hooks, buffering, search/source filtering, structured error handling, context-driven deadlines, and a comprehensive unit suite so other commands can stream logs without bespoke WebSocket loops.databricks auth tokento reuseauth.AcquireTokenvia the newlibs/auth/token_loader.go, ensuring persistent auth options and timeouts stay centralized and fully covered by regression tests.Why
This is a quality of life addition that allows the CLI to tail logs to the CLI. It hooks directly into the token acquisition module so that the tokens don't need to be managed separately.
Tests