This commit is contained in:
Markos Gogoulos
2026-01-29 14:58:44 +02:00
parent 5c8978453e
commit 095e4d2cb4

View File

@@ -10,11 +10,10 @@ Implements the LTI 1.3 / LTI Advantage flow:
- Manual NRPS Sync
"""
import json
import logging
import traceback
import uuid
from urllib.parse import parse_qs, urlencode, urlparse
from urllib.parse import urlencode
import jwt
from django.http import HttpResponse, HttpResponseRedirect, JsonResponse
@@ -234,10 +233,6 @@ class LaunchView(View):
else:
resource_link_obj = None
# Clear retry counter on successful launch
if 'lti_retry_count' in request.session:
del request.session['lti_retry_count']
create_lti_session(request, user, message_launch, platform)
# Check for media_friendly_token in custom claims
@@ -252,6 +247,8 @@ class LaunchView(View):
target_link_uri = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/target_link_uri', '')
logger.error(f"[LTI LAUNCH DEBUG] target_link_uri: {target_link_uri}")
if '?media_token=' in target_link_uri or '&media_token=' in target_link_uri:
from urllib.parse import parse_qs, urlparse
parsed = urlparse(target_link_uri)
params = parse_qs(parsed.query)
media_token = params.get('media_token', [None])[0]
@@ -264,19 +261,6 @@ class LaunchView(View):
request.session.modified = True
logger.error(f"[LTI LAUNCH DEBUG] Stored media_token in session: {media_token}")
# Handle lti_message_hint from custom claims
lti_message_hint_str = custom_claims.get('lti_message_hint', '')
if lti_message_hint_str:
try:
message_hint_data = json.loads(lti_message_hint_str)
if isinstance(message_hint_data, dict):
# Store in session for later use
if 'lti_session' in request.session:
request.session['lti_session']['message_hint'] = message_hint_data
request.session.modified = True
except (json.JSONDecodeError, ValueError):
pass
LTILaunchLog.objects.create(platform=platform, user=user, resource_link=resource_link_obj, launch_type='resource_link', success=True, claims=claims)
redirect_url = self.determine_redirect(launch_data, resource_link_obj)
@@ -285,18 +269,8 @@ class LaunchView(View):
return HttpResponseRedirect(redirect_url)
except LtiException as e: # noqa
error_message = str(e)
# Special handling for "State not found" errors - attempt retry
if "State not found" in error_message or "state not found" in error_message.lower():
logger.warning("[LTI LAUNCH] State not found error detected, attempting recovery")
return self.handle_state_not_found(request, platform)
# Other LTI exceptions - fail normally
traceback.print_exc()
except Exception as e: # noqa
error_message = str(e)
traceback.print_exc()
if platform:
@@ -304,107 +278,6 @@ class LaunchView(View):
return render(request, 'lti/launch_error.html', {'error': 'LTI Launch Failed', 'message': error_message}, status=400)
def handle_state_not_found(self, request, platform=None):
"""
Handle state not found errors by attempting to restart the OIDC flow.
This can happen when:
- Cookies are blocked/deleted
- Session expired
- Browser privacy settings interfere
"""
try:
# Check retry count to prevent infinite loops
retry_count = request.session.get('lti_retry_count', 0)
MAX_RETRIES = 2
if retry_count >= MAX_RETRIES:
logger.error(f"[LTI RETRY] Max retries ({MAX_RETRIES}) exceeded for state recovery")
return render(
request,
'lti/launch_error.html',
{
'error': 'Authentication Failed',
'message': (
'Unable to establish a secure session. This may be due to browser '
'cookie settings or privacy features. Please try:\n\n'
'1. Enabling cookies for this site\n'
'2. Disabling tracking protection for this site\n'
'3. Using a different browser\n'
'4. Contacting your administrator if the issue persists'
),
},
status=400,
)
# Extract launch parameters from the POST request
id_token = request.POST.get('id_token')
state = request.POST.get('state')
if not id_token:
raise ValueError("No id_token available for retry")
# Decode JWT to extract issuer and target info (no verification needed for this)
unverified = jwt.decode(id_token, options={"verify_signature": False})
iss = unverified.get('iss')
aud = unverified.get('aud') # This is the client_id
target_link_uri = unverified.get('https://purl.imsglobal.org/spec/lti/claim/target_link_uri')
# Get login_hint from JWT sub claim
login_hint = request.POST.get('login_hint') or unverified.get('sub')
if not all([iss, aud, target_link_uri]):
raise ValueError("Missing required parameters for OIDC retry")
# Try to identify platform
if not platform:
try:
platform = LTIPlatform.objects.get(platform_id=iss, client_id=aud)
except LTIPlatform.DoesNotExist:
raise ValueError(f"Platform not found: {iss}/{aud}")
# Increment retry counter
request.session['lti_retry_count'] = retry_count + 1
request.session.modified = True
logger.warning(f"[LTI RETRY] State not found, attempting retry #{retry_count + 1}. " f"Platform: {platform.name}, State: {state}, Target: {target_link_uri}")
# Build OIDC login URL with all parameters
oidc_login_url = request.build_absolute_uri(reverse('lti:oidc_login'))
params = {
'iss': iss,
'client_id': aud,
'target_link_uri': target_link_uri,
'login_hint': login_hint,
}
# Include lti_message_hint if we have it
lti_message_hint = request.POST.get('lti_message_hint')
if lti_message_hint:
params['lti_message_hint'] = lti_message_hint
# Add retry indicator
params['retry'] = retry_count + 1
redirect_url = f"{oidc_login_url}?{urlencode(params)}"
logger.error(f"[LTI RETRY] Redirecting to OIDC login: {redirect_url}")
return HttpResponseRedirect(redirect_url)
except Exception as retry_error:
logger.error(f"[LTI RETRY] Failed to handle state recovery: {retry_error}")
traceback.print_exc()
return render(
request,
'lti/launch_error.html',
{'error': 'LTI Launch Failed', 'message': f'State validation failed and automatic retry was unsuccessful: {str(retry_error)}'},
status=400,
)
def sanitize_claims(self, claims):
"""Remove sensitive data from claims before logging"""
safe_claims = claims.copy()