44Corresponds to TypeScript file: src/server/auth/handlers/authorize.ts
55"""
66
7- import re
8- from urllib .parse import urlparse , urlunparse , urlencode
9- from typing import Any , Callable , Dict , List , Literal , Optional
10- from urllib .parse import urlencode , parse_qs
7+ from typing import Callable , Literal , Optional
8+ from urllib .parse import parse_qs , urlencode , urlparse , urlunparse
119
12- from starlette .requests import Request
13- from starlette .responses import JSONResponse , RedirectResponse , Response
1410from pydantic import AnyHttpUrl , AnyUrl , BaseModel , Field , ValidationError
15- from pydantic_core import Url
11+ from starlette .requests import Request
12+ from starlette .responses import RedirectResponse , Response
1613
1714from mcp .server .auth .errors import (
18- InvalidClientError ,
15+ InvalidClientError ,
1916 InvalidRequestError ,
20- UnsupportedResponseTypeError ,
21- ServerError ,
2217 OAuthError ,
2318)
2419from mcp .server .auth .provider import AuthorizationParams , OAuthServerProvider
2823class AuthorizationRequest (BaseModel ):
2924 """
3025 Model for the authorization request parameters.
31-
26+
3227 Corresponds to request schema in authorizationHandler in src/server/auth/handlers/authorize.ts
3328 """
29+
3430 client_id : str = Field (..., description = "The client ID" )
35- redirect_uri : AnyHttpUrl | None = Field (..., description = "URL to redirect to after authorization" )
31+ redirect_uri : AnyHttpUrl | None = Field (
32+ ..., description = "URL to redirect to after authorization"
33+ )
3634
37- response_type : Literal ["code" ] = Field (..., description = "Must be 'code' for authorization code flow" )
35+ response_type : Literal ["code" ] = Field (
36+ ..., description = "Must be 'code' for authorization code flow"
37+ )
3838 code_challenge : str = Field (..., description = "PKCE code challenge" )
39- code_challenge_method : Literal ["S256" ] = Field ("S256" , description = "PKCE code challenge method, must be S256" )
39+ code_challenge_method : Literal ["S256" ] = Field (
40+ "S256" , description = "PKCE code challenge method, must be S256"
41+ )
4042 state : Optional [str ] = Field (None , description = "Optional state parameter" )
41- scope : Optional [str ] = Field (None , description = "Optional scope; if specified, should be a space-separated list of scope strings" )
42-
43+ scope : Optional [str ] = Field (
44+ None ,
45+ description = "Optional scope; if specified, should be a space-separated list of scope strings" ,
46+ )
47+
4348 class Config :
4449 extra = "ignore"
4550
46- def validate_scope (requested_scope : str | None , client : OAuthClientInformationFull ) -> list [str ] | None :
51+
52+ def validate_scope (
53+ requested_scope : str | None , client : OAuthClientInformationFull
54+ ) -> list [str ] | None :
4755 if requested_scope is None :
4856 return None
4957 requested_scopes = requested_scope .split (" " )
@@ -53,7 +61,10 @@ def validate_scope(requested_scope: str | None, client: OAuthClientInformationFu
5361 raise InvalidRequestError (f"Client was not registered with scope { scope } " )
5462 return requested_scopes
5563
56- def validate_redirect_uri (auth_request : AuthorizationRequest , client : OAuthClientInformationFull ) -> AnyHttpUrl :
64+
65+ def validate_redirect_uri (
66+ auth_request : AuthorizationRequest , client : OAuthClientInformationFull
67+ ) -> AnyHttpUrl :
5768 if auth_request .redirect_uri is not None :
5869 # Validate redirect_uri against client's registered redirect URIs
5970 if auth_request .redirect_uri not in client .redirect_uris :
@@ -64,16 +75,19 @@ def validate_redirect_uri(auth_request: AuthorizationRequest, client: OAuthClien
6475 elif len (client .redirect_uris ) == 1 :
6576 return client .redirect_uris [0 ]
6677 else :
67- raise InvalidRequestError ("redirect_uri must be specified when client has multiple registered URIs" )
78+ raise InvalidRequestError (
79+ "redirect_uri must be specified when client has multiple registered URIs"
80+ )
81+
6882
6983def create_authorization_handler (provider : OAuthServerProvider ) -> Callable :
7084 """
7185 Create a handler for the OAuth 2.0 Authorization endpoint.
72-
86+
7387 Corresponds to authorizationHandler in src/server/auth/handlers/authorize.ts
7488
7589 """
76-
90+
7791 async def authorization_handler (request : Request ) -> Response :
7892 """
7993 Handler for the OAuth 2.0 Authorization endpoint.
@@ -91,74 +105,79 @@ async def authorization_handler(request: Request) -> Response:
91105 auth_request = AuthorizationRequest .model_validate (params )
92106 except ValidationError as e :
93107 raise InvalidRequestError (str (e ))
94-
108+
95109 # Get client information
96110 try :
97111 client = await provider .clients_store .get_client (auth_request .client_id )
98112 except OAuthError as e :
99113 # TODO: proper error rendering
100114 raise InvalidClientError (str (e ))
101-
115+
102116 if not client :
103117 raise InvalidClientError (f"Client ID '{ auth_request .client_id } ' not found" )
104-
105-
118+
106119 # do validation which is dependent on the client configuration
107120 redirect_uri = validate_redirect_uri (auth_request , client )
108121 scopes = validate_scope (auth_request .scope , client )
109-
122+
110123 auth_params = AuthorizationParams (
111124 state = auth_request .state ,
112125 scopes = scopes ,
113126 code_challenge = auth_request .code_challenge ,
114127 redirect_uri = redirect_uri ,
115128 )
116-
129+
117130 try :
118131 # Let the provider handle the authorization flow
119- authorization_code = await provider .create_authorization_code (client , auth_params )
120- response = RedirectResponse (url = "" , status_code = 302 , headers = {"Cache-Control" : "no-store" })
121-
132+ authorization_code = await provider .create_authorization_code (
133+ client , auth_params
134+ )
135+ response = RedirectResponse (
136+ url = "" , status_code = 302 , headers = {"Cache-Control" : "no-store" }
137+ )
138+
122139 # Redirect with code
123140 parsed_uri = urlparse (str (auth_params .redirect_uri ))
124141 query_params = [(k , v ) for k , vs in parse_qs (parsed_uri .query ) for v in vs ]
125142 query_params .append (("code" , authorization_code ))
126143 if auth_params .state :
127144 query_params .append (("state" , auth_params .state ))
128-
129- redirect_url = urlunparse (parsed_uri ._replace (query = urlencode (query_params )))
145+
146+ redirect_url = urlunparse (
147+ parsed_uri ._replace (query = urlencode (query_params ))
148+ )
130149 response .headers ["location" ] = redirect_url
131-
150+
132151 return response
133152 except Exception as e :
134153 return RedirectResponse (
135154 url = create_error_redirect (redirect_uri , e , auth_request .state ),
136155 status_code = 302 ,
137156 headers = {"Cache-Control" : "no-store" },
138- )
139-
157+ )
158+
140159 return authorization_handler
141160
142- def create_error_redirect (redirect_uri : AnyUrl , error : Exception , state : Optional [str ]) -> str :
161+
162+ def create_error_redirect (
163+ redirect_uri : AnyUrl , error : Exception , state : Optional [str ]
164+ ) -> str :
143165 parsed_uri = urlparse (str (redirect_uri ))
144166 if isinstance (error , OAuthError ):
145- query_params = {
146- "error" : error .error_code ,
147- "error_description" : str (error )
148- }
167+ query_params = {"error" : error .error_code , "error_description" : str (error )}
149168 else :
150169 query_params = {
151170 "error" : "internal_error" ,
152- "error_description" : "An unknown error occurred"
171+ "error_description" : "An unknown error occurred" ,
153172 }
154173 # TODO: should we add error_uri?
155174 # if error.error_uri:
156175 # query_params["error_uri"] = str(error.error_uri)
157176 if state :
158177 query_params ["state" ] = state
159-
178+
160179 new_query = urlencode (query_params )
161180 if parsed_uri .query :
162181 new_query = f"{ parsed_uri .query } &{ new_query } "
163-
164- return urlunparse (parsed_uri ._replace (query = new_query ))
182+
183+ return urlunparse (parsed_uri ._replace (query = new_query ))
0 commit comments