Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 56 additions & 5 deletions src/mcp_atlassian/servers/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ def _create_user_config_for_fetcher(
ValueError: If required credentials are missing or auth_type is unsupported.
TypeError: If base_config is not a supported type.
"""
if auth_type not in ["oauth", "pat"]:
if auth_type not in ["oauth", "pat", "basic"]:
raise ValueError(
f"Unsupported auth_type '{auth_type}' for user-specific config creation. Expected 'oauth' or 'pat'."
f"Unsupported auth_type '{auth_type}' for user-specific config creation. Expected 'oauth', 'pat', or 'basic'."
)

username_for_config: str | None = credentials.get("user_email_context")
Expand Down Expand Up @@ -137,6 +137,29 @@ def _create_user_config_for_fetcher(
"api_token": None,
}
)
elif auth_type == "basic":
user_username = credentials.get("username")
user_api_token = credentials.get("api_token")
if not user_username or not user_api_token:
raise ValueError(
"Username and API token missing in credentials for user auth_type 'basic'"
)

# Log warning if cloud_id is provided with Basic auth (not typically needed)
if cloud_id:
logger.warning(
f"Cloud ID '{cloud_id}' provided with Basic authentication. "
"Basic authentication typically uses the base URL directly and doesn't require cloud_id override."
)

common_args.update(
{
"username": user_username,
"api_token": user_api_token,
"personal_token": None,
"oauth_config": None,
}
)

if isinstance(base_config, JiraConfig):
user_jira_config: UserJiraConfigType = dataclasses.replace(
Expand Down Expand Up @@ -181,8 +204,8 @@ async def get_jira_fetcher(ctx: Context) -> JiraFetcher:
return request.state.jira_fetcher
user_auth_type = getattr(request.state, "user_atlassian_auth_type", None)
logger.debug(f"get_jira_fetcher: User auth type: {user_auth_type}")
# If OAuth or PAT token is present, create user-specific fetcher
if user_auth_type in ["oauth", "pat"] and hasattr(
# If OAuth, PAT, or Basic auth token is present, create user-specific fetcher
if user_auth_type in ["oauth", "pat", "basic"] and hasattr(
request.state, "user_atlassian_token"
):
user_token = getattr(request.state, "user_atlassian_token", None)
Expand All @@ -198,6 +221,20 @@ async def get_jira_fetcher(ctx: Context) -> JiraFetcher:
credentials["oauth_access_token"] = user_token
elif user_auth_type == "pat":
credentials["personal_access_token"] = user_token
elif user_auth_type == "basic":
# Decode Basic auth token (base64 encoded "username:password")
import base64

try:
decoded = base64.b64decode(user_token).decode("utf-8")
username, api_token = decoded.split(":", 1)
credentials["username"] = username
credentials["api_token"] = api_token
logger.debug(
f"get_jira_fetcher: Decoded Basic auth for user: {username}"
)
except Exception as e:
raise ValueError(f"Invalid Basic auth token format: {e}")
lifespan_ctx_dict = ctx.request_context.lifespan_context # type: ignore
app_lifespan_ctx: MainAppContext | None = (
lifespan_ctx_dict.get("app_lifespan_context")
Expand Down Expand Up @@ -291,7 +328,7 @@ async def get_confluence_fetcher(ctx: Context) -> ConfluenceFetcher:
return request.state.confluence_fetcher
user_auth_type = getattr(request.state, "user_atlassian_auth_type", None)
logger.debug(f"get_confluence_fetcher: User auth type: {user_auth_type}")
if user_auth_type in ["oauth", "pat"] and hasattr(
if user_auth_type in ["oauth", "pat", "basic"] and hasattr(
request.state, "user_atlassian_token"
):
user_token = getattr(request.state, "user_atlassian_token", None)
Expand All @@ -305,6 +342,20 @@ async def get_confluence_fetcher(ctx: Context) -> ConfluenceFetcher:
credentials["oauth_access_token"] = user_token
elif user_auth_type == "pat":
credentials["personal_access_token"] = user_token
elif user_auth_type == "basic":
# Decode Basic auth token (base64 encoded "username:password")
import base64

try:
decoded = base64.b64decode(user_token).decode("utf-8")
username, api_token = decoded.split(":", 1)
credentials["username"] = username
credentials["api_token"] = api_token
logger.debug(
f"get_confluence_fetcher: Decoded Basic auth for user: {username}"
)
except Exception as e:
raise ValueError(f"Invalid Basic auth token format: {e}")
lifespan_ctx_dict = ctx.request_context.lifespan_context # type: ignore
app_lifespan_ctx: MainAppContext | None = (
lifespan_ctx_dict.get("app_lifespan_context")
Expand Down
18 changes: 17 additions & 1 deletion src/mcp_atlassian/servers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,22 @@ async def dispatch(
f"auth_type='{getattr(request.state, 'user_atlassian_auth_type', 'N/A')}', "
f"token_present={bool(getattr(request.state, 'user_atlassian_token', None))}"
)
elif auth_header and auth_header.startswith("Basic "):
token = auth_header.split(" ", 1)[1].strip()
if not token:
return JSONResponse(
{"error": "Unauthorized: Empty Basic auth token"},
status_code=401,
)
logger.debug(
f"UserTokenMiddleware.dispatch: Basic auth token extracted (masked): ...{mask_sensitive(token, 8)}"
)
request.state.user_atlassian_token = token
request.state.user_atlassian_auth_type = "basic"
request.state.user_atlassian_email = None
logger.debug(
"UserTokenMiddleware.dispatch: Set request.state for Basic auth."
)
elif auth_header and auth_header.startswith("Token "):
token = auth_header.split(" ", 1)[1].strip()
if not token:
Expand All @@ -310,7 +326,7 @@ async def dispatch(
)
return JSONResponse(
{
"error": "Unauthorized: Only 'Bearer <OAuthToken>' or 'Token <PAT>' types are supported."
"error": "Unauthorized: Only 'Bearer <OAuthToken>', 'Basic <BasicAuth>' or 'Token <PAT>' types are supported."
},
status_code=401,
)
Expand Down
4 changes: 2 additions & 2 deletions src/mcp_atlassian/utils/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def get_available_services() -> dict[str, bool | None]:
logger.info(
"Using Confluence Server/Data Center authentication (PAT or Basic Auth)"
)
elif os.getenv("ATLASSIAN_OAUTH_ENABLE", "").lower() in ("true", "1", "yes"):
if os.getenv("ATLASSIAN_OAUTH_ENABLE", "").lower() in ("true", "1", "yes"):
confluence_is_setup = True
logger.info(
"Using Confluence minimal OAuth configuration - expecting user-provided tokens via headers"
Expand Down Expand Up @@ -112,7 +112,7 @@ def get_available_services() -> dict[str, bool | None]:
logger.info(
"Using Jira Server/Data Center authentication (PAT or Basic Auth)"
)
elif os.getenv("ATLASSIAN_OAUTH_ENABLE", "").lower() in ("true", "1", "yes"):
if os.getenv("ATLASSIAN_OAUTH_ENABLE", "").lower() in ("true", "1", "yes"):
jira_is_setup = True
logger.info(
"Using Jira minimal OAuth configuration - expecting user-provided tokens via headers"
Expand Down
Loading