diff --git a/webapp/graphite/account/views.py b/webapp/graphite/account/views.py index 8c5fd662e..f397269d8 100644 --- a/webapp/graphite/account/views.py +++ b/webapp/graphite/account/views.py @@ -19,16 +19,28 @@ from django.urls import reverse from django.http import HttpResponseRedirect from django.shortcuts import render +from django.utils.http import url_has_allowed_host_and_scheme from graphite.user_util import getProfile, isAuthenticated +def _get_safe_next_page(request, param_value): + """Return param_value if it is a safe local redirect target, otherwise the browser URL.""" + if url_has_allowed_host_and_scheme( + url=param_value, + allowed_hosts={request.get_host()}, + require_https=request.is_secure(), + ): + return param_value + return reverse('browser') + + def loginView(request): username = request.POST.get('username') password = request.POST.get('password') if request.method == 'GET': - nextPage = request.GET.get('nextPage', reverse('browser')) + nextPage = _get_safe_next_page(request, request.GET.get('nextPage', reverse('browser'))) else: - nextPage = request.POST.get('nextPage', reverse('browser')) + nextPage = _get_safe_next_page(request, request.POST.get('nextPage', reverse('browser'))) if username and password: user = authenticate(username=username,password=password) if user is None: @@ -43,7 +55,7 @@ def loginView(request): def logoutView(request): - nextPage = request.GET.get('nextPage', reverse('browser')) + nextPage = _get_safe_next_page(request, request.GET.get('nextPage', reverse('browser'))) logout(request) return HttpResponseRedirect(nextPage) @@ -60,5 +72,5 @@ def updateProfile(request): if profile: profile.advancedUI = request.POST.get('advancedUI','off') == 'on' profile.save() - nextPage = request.POST.get('nextPage', reverse('browser')) + nextPage = _get_safe_next_page(request, request.POST.get('nextPage', reverse('browser'))) return HttpResponseRedirect(nextPage) diff --git a/webapp/graphite/url_shortener/views.py b/webapp/graphite/url_shortener/views.py index bbda14c8d..0f96f196f 100644 --- a/webapp/graphite/url_shortener/views.py +++ b/webapp/graphite/url_shortener/views.py @@ -14,7 +14,10 @@ def follow(request, link_id): """Follow existing links""" key = base62.to_decimal(link_id) link = get_object_or_404(Link, pk=key) - return HttpResponsePermanentRedirect(reverse('browser') + link.url) + # Strip leading slashes from the stored URL to prevent open redirect via + # protocol-relative URLs (e.g. //evil.com) being used as redirect targets. + url = reverse('browser') + link.url.lstrip('/') + return HttpResponsePermanentRedirect(url) def shorten(request, path): diff --git a/webapp/tests/test_account.py b/webapp/tests/test_account.py new file mode 100644 index 000000000..9007c7ea8 --- /dev/null +++ b/webapp/tests/test_account.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- + +from django.contrib.auth.models import User +from django.urls import reverse +from .base import TestCase + + +class LogoutViewTest(TestCase): + def test_logout_default_redirect(self): + """Logout without nextPage redirects to the browser.""" + url = reverse('account_logout') + response = self.client.get(url) + self.assertEqual(response.status_code, 302) + self.assertEqual(response['Location'], reverse('browser')) + + def test_logout_safe_relative_redirect(self): + """Logout with a safe relative nextPage redirects there.""" + url = reverse('account_logout') + response = self.client.get(url, {'nextPage': '/graphite/'}) + self.assertEqual(response.status_code, 302) + self.assertEqual(response['Location'], '/graphite/') + + def test_logout_open_redirect_blocked(self): + """Logout with an external nextPage falls back to the browser URL.""" + url = reverse('account_logout') + response = self.client.get(url, {'nextPage': 'http://evil.example.com'}) + self.assertEqual(response.status_code, 302) + self.assertEqual(response['Location'], reverse('browser')) + + def test_logout_open_redirect_blocked_https(self): + """Logout with an external HTTPS nextPage falls back to the browser URL.""" + url = reverse('account_logout') + response = self.client.get(url, {'nextPage': 'https://evil.example.com/path'}) + self.assertEqual(response.status_code, 302) + self.assertEqual(response['Location'], reverse('browser')) + + +class LoginViewTest(TestCase): + def setUp(self): + self.user = User.objects.create_user( + username='testuser', password='testpass', is_active=True + ) + + def test_login_open_redirect_blocked_get(self): + """GET login page with external nextPage falls back to the browser URL.""" + url = reverse('account_login') + response = self.client.get(url, {'nextPage': 'http://evil.example.com'}) + self.assertEqual(response.status_code, 200) + self.assertContains(response, reverse('browser')) + + def test_login_open_redirect_blocked_post(self): + """POST login with external nextPage falls back to the browser URL.""" + url = reverse('account_login') + response = self.client.post(url, { + 'username': 'testuser', + 'password': 'testpass', + 'nextPage': 'http://evil.example.com', + }) + self.assertEqual(response.status_code, 302) + self.assertEqual(response['Location'], reverse('browser')) + + def test_login_safe_relative_redirect(self): + """POST login with a safe relative nextPage redirects there.""" + url = reverse('account_login') + response = self.client.post(url, { + 'username': 'testuser', + 'password': 'testpass', + 'nextPage': '/graphite/', + }) + self.assertEqual(response.status_code, 302) + self.assertEqual(response['Location'], '/graphite/') diff --git a/webapp/tests/test_finders.py b/webapp/tests/test_finders.py index 05ac11560..176ba66a8 100644 --- a/webapp/tests/test_finders.py +++ b/webapp/tests/test_finders.py @@ -148,83 +148,83 @@ def test_standard_finder(self,scandir_mock): finder = get_finders('graphite.finders.standard.StandardFinder')[0] - scandir_mock.call_count = 0 + scandir_mock.reset_mock() nodes = finder.find_nodes(FindQuery('foo', None, None)) self.assertEqual(len(list(nodes)), 2) self.assertEqual(scandir_mock.call_count, 0) - scandir_mock.call_count = 0 + scandir_mock.reset_mock() nodes = finder.find_nodes(FindQuery('foo.bar.baz', None, None)) self.assertEqual(len(list(nodes)), 1) self.assertEqual(scandir_mock.call_count, 0) # test for https://github.com/grafana/grafana/issues/5936 - scandir_mock.call_count = 0 + scandir_mock.reset_mock() nodes = finder.find_nodes(FindQuery('foo.{bar}.baz', None, None)) self.assertEqual(len(list(nodes)), 1) self.assertEqual(scandir_mock.call_count, 0) - scandir_mock.call_count = 0 + scandir_mock.reset_mock() nodes = finder.find_nodes(FindQuery('foo.{bar,}.baz', None, None)) self.assertEqual(len(list(nodes)), 1) self.assertEqual(scandir_mock.call_count, 0) - scandir_mock.call_count = 0 + scandir_mock.reset_mock() nodes = finder.find_nodes(FindQuery('*.ba?.{baz,foo}', None, None)) self.assertEqual(len(list(nodes)), 2) self.assertEqual(scandir_mock.call_count, 4) - scandir_mock.call_count = 0 + scandir_mock.reset_mock() nodes = finder.find_nodes(FindQuery('{foo,bar}.{baz,bar}.{baz,foo}', None, None)) self.assertEqual(len(list(nodes)), 2) self.assertEqual(scandir_mock.call_count, 0) - scandir_mock.call_count = 0 + scandir_mock.reset_mock() nodes = finder.find_nodes(FindQuery('{foo}.bar.*', None, None)) self.assertEqual(len(list(nodes)), 1) self.assertEqual(scandir_mock.call_count, 1) - scandir_mock.call_count = 0 + scandir_mock.reset_mock() nodes = finder.find_nodes(FindQuery('foo.{ba{r,z},baz}.baz', None, None)) self.assertEqual(len(list(nodes)), 1) self.assertEqual(scandir_mock.call_count, 0) - scandir_mock.call_count = 0 + scandir_mock.reset_mock() nodes = finder.find_nodes(FindQuery('{foo,garbage}.bar.baz', None, None)) self.assertEqual(len(list(nodes)), 1) self.assertEqual(scandir_mock.call_count, 0) - scandir_mock.call_count = 0 + scandir_mock.reset_mock() nodes = finder.find_nodes(FindQuery('{fo{o}}.bar.baz', None, None)) self.assertEqual(len(list(nodes)), 1) self.assertEqual(scandir_mock.call_count, 0) - scandir_mock.call_count = 0 + scandir_mock.reset_mock() nodes = finder.find_nodes(FindQuery('foo{}.bar.baz', None, None)) self.assertEqual(len(list(nodes)), 1) self.assertEqual(scandir_mock.call_count, 0) - scandir_mock.call_count = 0 + scandir_mock.reset_mock() nodes = finder.find_nodes(FindQuery('{fo,ba}{o}.bar.baz', None, None)) self.assertEqual(len(list(nodes)), 1) self.assertEqual(scandir_mock.call_count, 0) - scandir_mock.call_count = 0 + scandir_mock.reset_mock() nodes = finder.find_nodes(FindQuery('{fo,ba}{o,o}.bar.baz', None, None)) self.assertEqual(len(list(nodes)), 1) self.assertEqual(scandir_mock.call_count, 0) - scandir_mock.call_count = 0 + scandir_mock.reset_mock() nodes = finder.find_nodes(FindQuery('{fo,ba}{o,z}.bar.baz', None, None)) self.assertEqual(len(list(nodes)), 1) self.assertEqual(scandir_mock.call_count, 0) - scandir_mock.call_count = 0 + scandir_mock.reset_mock() nodes = finder.find_nodes(FindQuery('foo;bar=baz', None, None)) self.assertEqual(len(list(nodes)), 1) self.assertEqual(scandir_mock.call_count, 0) - scandir_mock.call_count = 0 + scandir_mock.reset_mock() nodes = finder.find_nodes(FindQuery('foo;bar=baz2', None, None)) self.assertEqual(len(list(nodes)), 1) self.assertEqual(scandir_mock.call_count, 0) @@ -233,7 +233,7 @@ def test_standard_finder(self,scandir_mock): self.assertEqual(results, []) finally: - scandir_mock.call_count = 0 + scandir_mock.reset_mock() self.wipe_whisper() @patch('graphite.finders.standard.scandir', wraps=scandir_mock) @@ -244,18 +244,18 @@ def test_standard_finder_gzipped_whisper(self, scandir_mock): self.create_whisper(join('bar', 'baz', 'foo.wsp')) finder = get_finders('graphite.finders.standard.StandardFinder')[0] - scandir_mock.call_count = 0 + scandir_mock.reset_mock() nodes = finder.find_nodes(FindQuery('foo', None, None)) self.assertEqual(len(list(nodes)), 2) self.assertEqual(scandir_mock.call_count, 0) - scandir_mock.call_count = 0 + scandir_mock.reset_mock() nodes = finder.find_nodes(FindQuery('foo{}.bar.baz', None, None)) self.assertEqual(len(list(nodes)), 1) self.assertEqual(scandir_mock.call_count, 0) finally: - scandir_mock.call_count = 0 + scandir_mock.reset_mock() self.wipe_whisper() def test_standard_finder_tagged_whisper_carbonlink(self): diff --git a/webapp/tests/test_url_shortener.py b/webapp/tests/test_url_shortener.py new file mode 100644 index 000000000..04f2bf3fd --- /dev/null +++ b/webapp/tests/test_url_shortener.py @@ -0,0 +1,45 @@ +try: + from django.urls import reverse +except ImportError: # Django < 1.10 + from django.core.urlresolvers import reverse + +from urllib.parse import urlparse + +from .base import TestCase + + +class UrlShortenerTest(TestCase): + def test_shorten_and_follow(self): + """Test normal URL shortening and following.""" + shorten_url = reverse('shorten', kwargs={'path': 'render/'}) + response = self.client.get(shorten_url, {'target': 'test'}) + self.assertEqual(response.status_code, 200) + short_path = response.content.decode('utf-8') + + follow_response = self.client.get(short_path) + self.assertEqual(follow_response.status_code, 301) + redirect_url = follow_response['Location'] + # Should redirect to an internal path, not a protocol-relative or absolute URL + self.assertFalse(redirect_url.startswith('//')) + parsed = urlparse(redirect_url) + self.assertFalse(bool(parsed.netloc), + 'Redirect to external domain detected: %s' % redirect_url) + + def test_follow_open_redirect_prevention(self): + """Test that protocol-relative URLs stored in links cannot cause open redirects.""" + # Simulate shortening a crafted path that starts with // + shorten_url = reverse('shorten', kwargs={'path': '//evil.com'}) + response = self.client.get(shorten_url) + self.assertEqual(response.status_code, 200) + short_path = response.content.decode('utf-8') + + follow_response = self.client.get(short_path) + self.assertEqual(follow_response.status_code, 301) + redirect_url = follow_response['Location'] + # The redirect must not be a protocol-relative URL pointing to an external domain + self.assertFalse(redirect_url.startswith('//'), + 'Open redirect detected: %s' % redirect_url) + # Should not redirect to an external host + parsed = urlparse(redirect_url) + self.assertFalse(bool(parsed.netloc), + 'Open redirect to external domain detected: %s' % redirect_url)