Skip to content

Conversation

@ericfeunekes
Copy link

Changes

  • Add the databricks apps logs NAME command, including tail/follow/search/source/output-file flags wired via cmdgroup, file mirroring with 0600 perms, and validation against apps that lack a public URL. (cmd/workspace/apps)
  • Introduce the reusable libs/logstream helper with token refresh hooks, buffering, search/source filtering, structured error handling, context-driven deadlines, and a comprehensive unit suite so other commands can stream logs without bespoke WebSocket loops.
  • Refactor databricks auth token to reuse auth.AcquireToken via the new libs/auth/token_loader.go, ensuring persistent auth options and timeouts stay centralized and fully covered by regression tests.

Why

This is a quality of life addition that allows the CLI to tail logs to the CLI. It hooks directly into the token acquisition module so that the tokens don't need to be managed separately.

Tests

  • go test ./libs/logstream
  • go test ./cmd/workspace/apps
  • go test ./cmd/auth

Copy link
Member

@pkosiec pkosiec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @ericfeunekes for your contribution! The new app logs command is super helpful. I tested the changes and it works well 👍

Before merge, please take a look at my comments. I can help you implementing the changes if you'd like to - feel free to ping me anytime. Thanks!

Comment on lines 104 to 106
OAuthArgument: oauthArgument,
Host: args.authArguments.Host,
AccountID: args.authArguments.AccountID,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we already pass args.authArguments (which consists of Host and AccountID properties), perhaps it makes sense to:

  • pass the whole struct args.authArguments as an input argument
  • do oauthArgument, err := args.authArguments.ToOAuthArgument() inside the auth.AcquireToken()

In that way we'll reduce 3 input args to just one. Does it make sense?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auth.AcquireTokenRequest now carries the entire auth.AuthArguments struct and does the ToOAuthArgument conversion internally, so CLI callers don’t need to juggle host/account IDs or duplicate that logic. cmd/auth token and the new apps logs command just pass the struct plus profile/timeout metadata.

}
}

func TestLoadTokenDoesNotMutatePersistentAuthOpts(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understood correctly, this test verifies those 3 lines:

allOpts := append([]u2m.PersistentAuthOption{}, req.PersistentAuthOpts...)
allOpts = append(allOpts, u2m.WithOAuthArgument(req.OAuthArgument))
persistentAuth, err := u2m.NewPersistentAuth(ctx, allOpts...)

Is that correct? So it effectively tests the AcquireToken function, and just that specific part. I'm not really sure if there's enough value in such separate unit test 🤔 I'll ask for a second opinion, but for now we can keep it, however let's move it to libs/auth/token_loader_test.go 👍

validateToken: validateToken,
},
}
for _, c := range cases {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we move the loadToken tests to libs/auth/token_loader_test.go, and keep just a single happy path for the loadToken itself? Now the core logic is in the AcquireToken and basically that's what the loadToken unit tests test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is the best place to have this skill definition - what do you think about moving it to .claude/skills/logstream.md?

Also, do you have any use case for the CLI that could use your skill? I'm wondering if we need it at all 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment above was missed, please move this file to .claude/skills/logstream.md 🙏

Comment on lines 86 to 90
oauthArg, err := authArgs.ToOAuthArgument()
if err != nil {
return err
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(As noted in the first comment, once it is applied, this would be moved to the auth.AcquireTokenRequest)

var closeErr *websocket.CloseError
if errors.As(err, &closeErr) {
switch closeErr.Code {
case 4401, 4403:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you define consts for the error codes please?

if err := waitForBackoff(ctx, timer, backoff); err != nil {
return err
}
backoff = min(backoff*2, 5*time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add 5*time.Second to consts 👍

userAgent string
}

func (s *logStreamer) Run(ctx context.Context) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we split the Run and consume methods to increase readability please? 🙏


backoff = time.Second
err = s.consume(ctx, conn)
_ = conn.Close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refactoring (splitting the method to multiple ones) would be helpful as then we could follow the best practices and add defer conn.Close() right after a successful connection:

        conn, resp, err := s.dialer.DialContext(ctx, s.url, headers)
		if err != nil {
			// err handling
		}
        defer conn.Close() // <-- here

and also, within the same function, we could move the goroutine:

	go func() {
		select {
		case <-ctx.Done():
			_ = conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "context canceled"), time.Now().Add(time.Second))
			_ = conn.Close()
		case <-closeCh:
		}
	}()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, currently the connection is closed two times and in case of timeouted context, second call would return an error (connection already closed). Could we use sync.Once to solve it?

userAgent string
}

func (s *logStreamer) Run(ctx context.Context) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add a comment that this method isn't thread-safe, just to make sure no one runs a single logStreamer instance concurrently 👍

@ericfeunekes
Copy link
Author

ericfeunekes commented Nov 13, 2025

@pkosiec I think I've addressed each of your issues and tried to keep them on separate commits to make it easier to review

@pkosiec pkosiec self-assigned this Nov 14, 2025
Copy link
Member

@pkosiec pkosiec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for implementing the comments, including the logs color formatting!

Image

We're almost there - I put some minor comments. There's one small issue with -f and -tail-lines flags, but other than that I don't see any major blockers 👍

Comment on lines 125 to 206
func TestAcquireTokenRefreshFailureWithProfileShowsLoginHint(t *testing.T) {
_, err := AcquireToken(context.Background(), AcquireTokenRequest{
AuthArguments: &AuthArguments{
Host: "https://accounts.cloud.databricks.com",
AccountID: "expired",
},
ProfileName: "expired",
PersistentAuthOpts: []u2m.PersistentAuthOption{
u2m.WithTokenCache(newTokenCache()),
u2m.WithOAuthEndpointSupplier(&mockOAuthEndpointSupplier{}),
u2m.WithHttpClient(&http.Client{Transport: fixtures.SliceTransport{refreshFailureTokenResponse}}),
},
})
require.EqualError(t, err, `A new access token could not be retrieved because the refresh token is invalid. To reauthenticate, run the following command:
$ databricks auth login --profile expired`)
}

func TestAcquireTokenRefreshFailureWithHostShowsLoginHint(t *testing.T) {
_, err := AcquireToken(context.Background(), AcquireTokenRequest{
AuthArguments: &AuthArguments{
Host: "https://accounts.cloud.databricks.com",
AccountID: "expired",
},
PersistentAuthOpts: []u2m.PersistentAuthOption{
u2m.WithTokenCache(newTokenCache()),
u2m.WithOAuthEndpointSupplier(&mockOAuthEndpointSupplier{}),
u2m.WithHttpClient(&http.Client{Transport: fixtures.SliceTransport{refreshFailureTokenResponse}}),
},
})
require.EqualError(t, err, `A new access token could not be retrieved because the refresh token is invalid. To reauthenticate, run the following command:
$ databricks auth login --host https://accounts.cloud.databricks.com --account-id expired`)
}

func TestAcquireTokenInvalidRefreshResponseShowsHelp(t *testing.T) {
_, err := AcquireToken(context.Background(), AcquireTokenRequest{
AuthArguments: &AuthArguments{
Host: "https://accounts.cloud.databricks.com",
AccountID: "active",
},
ProfileName: "active",
PersistentAuthOpts: []u2m.PersistentAuthOption{
u2m.WithTokenCache(newTokenCache()),
u2m.WithOAuthEndpointSupplier(&mockOAuthEndpointSupplier{}),
u2m.WithHttpClient(&http.Client{Transport: fixtures.SliceTransport{refreshFailureInvalidResponse}}),
},
})
require.EqualError(t, err, "token refresh: oauth2: cannot parse json: invalid character 'N' looking for beginning of value. Try logging in again with `databricks auth login --profile active` before retrying. If this fails, please report this issue to the Databricks CLI maintainers at https://github.com/databricks/cli/issues/new")
}

func TestAcquireTokenOtherErrorShowsHelp(t *testing.T) {
_, err := AcquireToken(context.Background(), AcquireTokenRequest{
AuthArguments: &AuthArguments{
Host: "https://accounts.cloud.databricks.com",
AccountID: "active",
},
ProfileName: "active",
PersistentAuthOpts: []u2m.PersistentAuthOption{
u2m.WithTokenCache(newTokenCache()),
u2m.WithOAuthEndpointSupplier(&mockOAuthEndpointSupplier{}),
u2m.WithHttpClient(&http.Client{Transport: fixtures.SliceTransport{refreshFailureOtherError}}),
},
})
require.EqualError(t, err, "token refresh: Databricks is down (error code: other_error). Try logging in again with `databricks auth login --profile active` before retrying. If this fails, please report this issue to the Databricks CLI maintainers at https://github.com/databricks/cli/issues/new")
}

func TestAcquireTokenSuccessReturnsAccessToken(t *testing.T) {
token, err := AcquireToken(context.Background(), AcquireTokenRequest{
AuthArguments: &AuthArguments{
Host: "https://accounts.cloud.databricks.com",
AccountID: "active",
},
ProfileName: "active",
PersistentAuthOpts: []u2m.PersistentAuthOption{
u2m.WithTokenCache(newTokenCache()),
u2m.WithOAuthEndpointSupplier(&mockOAuthEndpointSupplier{}),
u2m.WithHttpClient(&http.Client{Transport: fixtures.SliceTransport{refreshSuccessTokenResponse}}),
},
})
require.NoError(t, err)
assert.Equal(t, "new-access-token", token.AccessToken)
assert.Equal(t, "Bearer", token.TokenType)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are moved tests from the token_test.go, correct? Could you please keep them defined as table tests? It's much easier to maintain such cases in table test format. Thanks!

Comment on lines 19 to 21
var persistentAuthFactory = func(ctx context.Context, opts ...u2m.PersistentAuthOption) (persistentAuth, error) {
return u2m.NewPersistentAuth(ctx, opts...)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this is defined purely for a single unit test that checks if the PersistentAuthOpts aren't mutated, correct?

I'd get rid of this as we don't need to do that. Please check my next comment for more details 👍

Comment on lines 32 to 53
defaultFactory := persistentAuthFactory
t.Cleanup(func() {
persistentAuthFactory = defaultFactory
})

opts := []u2m.PersistentAuthOption{
func(pa *u2m.PersistentAuth) {},
func(pa *u2m.PersistentAuth) {},
}
initialLen := len(opts)
factoryCalls := 0

persistentAuthFactory = func(ctx context.Context, providedOpts ...u2m.PersistentAuthOption) (persistentAuth, error) {
factoryCalls++
require.Len(t, providedOpts, initialLen+1)
require.Len(t, opts, initialLen) // original slice must not change
return &fakePersistentAuth{
token: &oauth2.Token{
AccessToken: "token",
},
}, nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just skip that part, and use the mockOAuthEndpointSupplier, and then check the length of the opts, without counting the calls etc. 👍 No need for it IMO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment above was missed, please move this file to .claude/skills/logstream.md 🙏

Comment on lines 20 to 25
const handshakeErrorBodyLimit = 4 * 1024
const defaultUserAgent = "databricks-cli logstream"
const initialReconnectBackoff = 200 * time.Millisecond
const maxReconnectBackoff = 5 * time.Second
const closeCodeUnauthorized = 4401
const closeCodeForbidden = 4403
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const handshakeErrorBodyLimit = 4 * 1024
const defaultUserAgent = "databricks-cli logstream"
const initialReconnectBackoff = 200 * time.Millisecond
const maxReconnectBackoff = 5 * time.Second
const closeCodeUnauthorized = 4401
const closeCodeForbidden = 4403
const (
handshakeErrorBodyLimit = 4 * 1024
defaultUserAgent = "databricks-cli logstream"
initialReconnectBackoff = 200 * time.Millisecond
maxReconnectBackoff = 5 * time.Second
closeCodeUnauthorized = 4401
closeCodeForbidden = 4403
)

}

backoff := initialReconnectBackoff
timer := time.NewTimer(time.Hour)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We stop the timer immediately in the next line - shouldn't we have just var timer time.Timer?


stopWatch := watchContext(ctx, conn)
defer stopWatch()
defer conn.Close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that we don't need to do that as the stopWatch will close the connection? If so, then we can remove that line (that was my original intention of one of the previous comments - ensure the connection is closed only once)

return line
}

type tailBuffer struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (for later): Later (in a follow-up PR) we could move tailBuffer, formatting-related helpers to separate files. But let's do that later 👍

Comment on lines 490 to 507
func stopTimer(timer *time.Timer) {
if timer == nil {
return
}
if !timer.Stop() {
drainTimer(timer)
}
}

func resetTimer(timer *time.Timer, d time.Duration) {
if timer == nil {
return
}
if !timer.Stop() {
drainTimer(timer)
}
timer.Reset(d)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those functions look super similar, can't we have just one, stopTimer? resetTimer is called only once and it could use stopTimer with another line timer.Reset(d).

cmd.AddCommand(newStop())
cmd.AddCommand(newUpdate())
cmd.AddCommand(newUpdatePermissions())
cmd.AddCommand(newLogsCommand())
Copy link
Member

@pkosiec pkosiec Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies, I know I put a comment to add the command as a part of the file but now I realized it is generated from API files in one of our internal repositories.

Could we please revert the changes in ffe6e3d? 🙏 I can do that, if you'd like to 👍 Thank you!

(For a later follow-up PR)
@fjakobs @pietern The /logz and /logz/stream endpoints don't seem to be part of the public API. Should we keep it as it is (after the revert of the mentioned commit) or should I include the endpoints in our proto files in the universe repo?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can move this into overrides.go in the same directory.

See cmdOverrides.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pkosiec The endpoints are not public APIs at the platform level, they are specific to the apps runtime.

That's why they are not documented in the platform level API docs.

Copy link
Contributor

@pietern pietern left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ericfeunekes, thanks for the PR!

The token changes (cmd/auth/token_* and libs/auth/token_*) are not necessary. You can use the TokenSource function on the SDK configuration directly to get a fresh OAuth token. I confirmed that the following patch works: https://gist.github.com/pietern/41d02acc2907416244789c7408fb232f

Please move logstream to libs/apps to make it clearer that it is specific to apps.

cmd.AddCommand(newStop())
cmd.AddCommand(newUpdate())
cmd.AddCommand(newUpdatePermissions())
cmd.AddCommand(newLogsCommand())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can move this into overrides.go in the same directory.

See cmdOverrides.

@pietern
Copy link
Contributor

pietern commented Nov 17, 2025

Btw, when I stop an app during tail, the logs command doesn't stop or error.

@pietern
Copy link
Contributor

pietern commented Nov 17, 2025

@ericfeunekes I forgot an important detail in the previous message.

Contributing to CLI requires a signed CLA, if that's something you're willing to do, please reach out with a request to sign CLA to [email protected] and we will take it from there.

Thanks!

Eric Feunekes and others added 9 commits November 18, 2025 09:43
Replace custom token acquisition logic with SDK's built-in TokenSource
function as suggested by @pietern. This simplifies the implementation
by delegating token lifecycle management to the SDK configuration.

Changes:
- Use cfg.GetTokenSource() instead of auth.AcquireToken()
- Remove custom token_loader abstraction (libs/auth/token_loader.go)
- Revert cmd/auth/token.go to original implementation
- Remove tokenAcquireTimeout constant (handled by SDK)

This addresses review feedback from @pietern on Nov 17, 2025.
Reference: https://gist.github.com/pietern/41d02acc2907416244789c7408fb232f

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Relocate logstream from libs/logstream to libs/apps/logstream to make
it clearer that this package is specific to Databricks Apps functionality.

Changes:
- Move libs/logstream/streamer.go to libs/apps/logstream/streamer.go
- Move libs/logstream/streamer_test.go to libs/apps/logstream/streamer_test.go
- Update import in cmd/workspace/apps/logs.go

This addresses review feedback from @pietern on Nov 17, 2025.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Group individual const declarations into a single const block for better
code organization and readability.

Changes:
- Convert six individual const declarations to const block
- Apply alignment formatting

This addresses review feedback from @pkosiec on Nov 14, 2025.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
When using --tail-lines with -f (follow), the buffer was flushing too
early, showing all historical logs instead of just the last N lines.

The tail buffer maintains a rolling window of the last N lines, but it
was flushing as soon as it reached N lines when following, rather than
waiting for the prefetch window to collect all historical logs first.

Fix: Always respect the flush deadline (prefetch window) regardless of
follow mode. This allows historical logs to stream in and the rolling
buffer to maintain only the truly last N lines before flushing.

Example with --tail-lines 10 -f:
- Before: Shows lines 1-10, then 11-1000+ (all logs)
- After: Shows lines 991-1000, then new logs (correct)

This addresses review feedback from @pkosiec on Nov 14, 2025.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Replace magic number (30 seconds) with defaultHandshakeTimeout constant
for better code maintainability.

Changes:
- Add defaultHandshakeTimeout = 30 * time.Second to const block
- Use constant in newLogStreamDialer instead of inline value

This addresses review feedback from @pkosiec on Nov 12/14, 2025.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
@ericfeunekes ericfeunekes requested a review from a team as a code owner November 18, 2025 15:05
When following app logs (--follow) and the app stops or is deleted,
the logs command now detects this and exits gracefully instead of
retrying indefinitely.

Implementation:
- Added AppStatusChecker callback to logstream.Config
- Check app status before each reconnect attempt in follow mode
- Also check when connection closes normally during follow
- Exit with clear error message when app is stopped/deleting/error state

This addresses @pietern's comment about the command not stopping when
the app is stopped during tail.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
@github-actions
Copy link

An authorized user can trigger integration tests manually by following the instructions below:

Trigger:
go/deco-tests-run/cli

Inputs:

  • PR number: 3908
  • Commit SHA: ce50048dcd5743ec1931ccc58c4666669ee24245

Checks will be approved automatically on success.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants