Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
SUSE:SLE-15-SP2:Update
python-Twisted.31485
CVE-2022-21712-sec-expo-CO-redirect.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File CVE-2022-21712-sec-expo-CO-redirect.patch of Package python-Twisted.31485
From eda4f1e2ec9988a142de244f1a2b285939718c03 Mon Sep 17 00:00:00 2001 From: Glyph <glyph@twistedmatrix.com> Date: Sun, 23 Jan 2022 12:57:49 -0800 Subject: [PATCH 01/10] failing test for header data leak --- src/twisted/newsfragments/10294.bugfix | 1 src/twisted/web/client.py | 64 ++++++++++++++++-- src/twisted/web/iweb.py | 10 +- src/twisted/web/test/test_agent.py | 114 ++++++++++++++++++++++++--------- src/twisted/web/test/test_http.py | 6 - 5 files changed, 152 insertions(+), 43 deletions(-) --- /dev/null +++ b/src/twisted/newsfragments/10294.bugfix @@ -0,0 +1 @@ +twisted.web.client.RedirectAgent and twisted.web.client.BrowserLikeRedirectAgent now properly remove sensitive headers when redirecting to a different origin. \ No newline at end of file --- a/src/twisted/web/client.py +++ b/src/twisted/web/client.py @@ -11,9 +11,10 @@ from __future__ import division, absolut import os import collections import warnings +import zlib try: - from urlparse import urlunparse, urljoin, urldefrag + from urlparse import urldefrag, urljoin, urlunparse except ImportError: from urllib.parse import urljoin, urldefrag from urllib.parse import urlunparse as _urlunparse @@ -22,19 +23,20 @@ except ImportError: result = _urlunparse(tuple([p.decode("charmap") for p in parts])) return result.encode("charmap") -import zlib from functools import wraps from zope.interface import implementer from twisted.python.compat import _PY3, networkString from twisted.python.compat import nativeString, intToBytes, unicode, itervalues -from twisted.python.deprecate import deprecatedModuleAttribute, deprecated +from twisted.python.deprecate import ( + deprecated, + deprecatedModuleAttribute, + getDeprecationWarningString, +) from twisted.python.failure import Failure from incremental import Version -from twisted.web.iweb import IPolicyForHTTPS, IAgentEndpointFactory -from twisted.python.deprecate import getDeprecationWarningString from twisted.web import http from twisted.internet import defer, protocol, task, reactor from twisted.internet.abstract import isIPv6Address @@ -43,7 +45,14 @@ from twisted.internet.endpoints import H from twisted.python.util import InsensitiveDict from twisted.python.components import proxyForInterface from twisted.web import error -from twisted.web.iweb import UNKNOWN_LENGTH, IAgent, IBodyProducer, IResponse +from twisted.web.iweb import ( + UNKNOWN_LENGTH, + IAgent, + IAgentEndpointFactory, + IBodyProducer, + IPolicyForHTTPS, + IResponse, +) from twisted.web.http_headers import Headers from twisted.logger import Logger @@ -2090,6 +2099,18 @@ class ContentDecoderAgent(object): +_canonicalHeaderName = Headers()._canonicalNameCaps +_defaultSensitiveHeaders = frozenset( + [ + b"Authorization", + b"Cookie", + b"Cookie2", + b"Proxy-Authorization", + b"WWW-Authenticate", + ] +) + + @implementer(IAgent) class RedirectAgent(object): """ @@ -2104,6 +2125,11 @@ class RedirectAgent(object): @param redirectLimit: The maximum number of times the agent is allowed to follow redirects before failing with a L{error.InfiniteRedirection}. + @param sensitiveHeaderNames: An iterable of C{bytes} enumerating the names + of headers that must not be transmitted when redirecting to a different + origins. These will be consulted in addition to the protocol-specified + set of headers that contain sensitive information. + @cvar _redirectResponses: A L{list} of HTTP status codes to be redirected for I{GET} and I{HEAD} methods. @@ -2118,9 +2144,17 @@ class RedirectAgent(object): _seeOtherResponses = [http.SEE_OTHER] - def __init__(self, agent, redirectLimit=20): + def __init__( + self, + agent, + redirectLimit = 20, + sensitiveHeaderNames = (), + ): self._agent = agent self._redirectLimit = redirectLimit + sensitive = {_canonicalHeaderName(each) for each in sensitiveHeaderNames} + sensitive.update(_defaultSensitiveHeaders) + self._sensitiveHeaderNames = sensitive def request(self, method, uri, headers=None, bodyProducer=None): @@ -2167,6 +2201,22 @@ class RedirectAgent(object): response.code, b'No location header field', uri) raise ResponseFailed([Failure(err)], response) location = self._resolveLocation(uri, locationHeaders[0]) + if headers: + parsedURI = URI.fromBytes(uri) + parsedLocation = URI.fromBytes(location) + sameOrigin = ( + (parsedURI.scheme == parsedLocation.scheme) + and (parsedURI.host == parsedLocation.host) + and (parsedURI.port == parsedLocation.port) + ) + if not sameOrigin: + headers = Headers( + { + rawName: rawValue + for rawName, rawValue in headers.getAllRawHeaders() + if rawName not in self._sensitiveHeaderNames + } + ) deferred = self._agent.request(method, location, headers) def _chainResponse(newResponse): newResponse.setPreviousResponse(response) --- a/src/twisted/web/iweb.py +++ b/src/twisted/web/iweb.py @@ -716,12 +716,12 @@ class IAgent(Interface): obtained by combining a number of (hypothetical) implementations:: baseAgent = Agent(reactor) - redirect = BrowserLikeRedirectAgent(baseAgent, limit=10) + decode = ContentDecoderAgent(baseAgent, [(b"gzip", GzipDecoder())]) + cookie = CookieAgent(decode, diskStore.cookie) authenticate = AuthenticateAgent( - redirect, [diskStore.credentials, GtkAuthInterface()]) - cookie = CookieAgent(authenticate, diskStore.cookie) - decode = ContentDecoderAgent(cookie, [(b"gzip", GzipDecoder())]) - cache = CacheAgent(decode, diskStore.cache) + cookie, [diskStore.credentials, GtkAuthInterface()]) + cache = CacheAgent(authenticate, diskStore.cache) + redirect = BrowserLikeRedirectAgent(cache, limit=10) doSomeRequests(cache) """ --- a/src/twisted/web/test/test_agent.py +++ b/src/twisted/web/test/test_agent.py @@ -6,8 +6,13 @@ Tests for L{twisted.web.client.Agent} an """ import zlib +try: + from http.cookiejar import CookieJar +except ImportError: + from cookielib import CookieJar from io import BytesIO +from unittest import SkipTest from zope.interface.verify import verifyObject @@ -30,11 +35,21 @@ from twisted.internet.defer import Defer from twisted.internet.endpoints import TCP4ClientEndpoint from twisted.internet.address import IPv4Address, IPv6Address -from twisted.web.client import (FileBodyProducer, Request, HTTPConnectionPool, - ResponseDone, _HTTP11ClientFactory, URI) +from twisted.web.client import ( + FileBodyProducer, + HTTPConnectionPool, + _HTTP11ClientFactory, + Request, + ResponseDone, + URI, +) from twisted.web.iweb import ( - UNKNOWN_LENGTH, IAgent, IBodyProducer, IResponse, IAgentEndpointFactory, + IAgent, + IAgentEndpointFactory, + IBodyProducer, + IResponse, + UNKNOWN_LENGTH, ) from twisted.web.http_headers import Headers from twisted.web._newclient import HTTP11ClientProtocol, Response @@ -58,6 +73,11 @@ from twisted.web.test.injectionhelpers i from twisted.web.error import SchemeNotSupported from twisted.logger import globalLogPublisher +# Creatively lie to mypy about the nature of inheritance, since dealing with +# expectations of a mixin class is basically impossible (don't use mixins). +testMixinClass = object +runtimeTestCase = TestCase + try: from twisted.internet import ssl except ImportError: @@ -337,6 +357,7 @@ class FileBodyProducerTests(TestCase): self._scheduled.pop(0)() self.assertEqual(expectedResult[:readSize * 2], output.getvalue()) + EXAMPLE_COM_IP = '127.0.0.7' EXAMPLE_COM_V6_IP = '::7' EXAMPLE_NET_IP = '127.0.0.8' @@ -2628,11 +2649,23 @@ class ProxyAgentTests(TestCase, FakeReac -class _RedirectAgentTestsMixin(object): +SENSITIVE_HEADERS = [ + b"authorization", + b"cookie", + b"cookie2", + b"proxy-authorization", + b"www-authenticate", +] + +testMixinClass = object + + +class _RedirectAgentTestsMixin(testMixinClass): """ Test cases mixin for L{RedirectAgentTests} and L{BrowserLikeRedirectAgentTests}. """ + def test_noRedirect(self): """ L{client.RedirectAgent} behaves like L{client.Agent} if the response @@ -2651,34 +2684,58 @@ class _RedirectAgentTestsMixin(object): self.assertIdentical(response, result) self.assertIdentical(result.previousResponse, None) - - def _testRedirectDefault(self, code): + def _testRedirectDefault( + self, + code, + crossScheme = False, + crossDomain = False, + crossPort = False, + requestHeaders = None, + ): """ When getting a redirect, L{client.RedirectAgent} follows the URL specified in the L{Location} header field and make a new request. @param code: HTTP status code. """ - self.agent.request(b'GET', b'http://example.com/foo') + startDomain = b"example.com" + startScheme = b"https" if ssl is not None else b"http" + startPort = 80 if startScheme == b"http" else 443 + self.agent.request( + b"GET", startScheme + b"://" + startDomain + b"/foo", headers=requestHeaders + ) host, port = self.reactor.tcpClients.pop()[:2] self.assertEqual(EXAMPLE_COM_IP, host) - self.assertEqual(80, port) + self.assertEqual(startPort, port) req, res = self.protocol.requests.pop() - # If possible (i.e.: SSL support is present), run the test with a + # If possible (i.e.: TLS support is present), run the test with a # cross-scheme redirect to verify that the scheme is honored; if not, # let's just make sure it works at all. - if ssl is None: - scheme = b'http' - expectedPort = 80 - else: - scheme = b'https' - expectedPort = 443 + targetScheme = startScheme + targetDomain = startDomain + targetPort = startPort + + if crossScheme: + if ssl is None: + raise SkipTest( + "Cross-scheme redirects can't be tested without TLS support." + ) + targetScheme = b"https" if startScheme == b"http" else b"http" + targetPort = 443 if startPort == 80 else 80 + + portSyntax = b"" + if crossPort: + targetPort = 8443 + portSyntax = b":8443" + targetDomain = b"example.net" if crossDomain else startDomain + locationValue = targetScheme + b"://" + targetDomain + portSyntax + b"/bar" headers = http_headers.Headers( - {b'location': [scheme + b'://example.com/bar']}) + {b"location": [locationValue]} + ) response = Response((b'HTTP', 1, 1), code, b'OK', headers, None) res.callback(response) @@ -2688,7 +2745,7 @@ class _RedirectAgentTestsMixin(object): host, port = self.reactor.tcpClients.pop()[:2] self.assertEqual(EXAMPLE_COM_IP, host) - self.assertEqual(expectedPort, port) + self.assertEqual(targetPort, port) def test_redirect301(self): @@ -2926,9 +2983,9 @@ class _RedirectAgentTestsMixin(object): self.assertIdentical(redirectResponse.previousResponse, None) - -class RedirectAgentTests(TestCase, FakeReactorAndConnectMixin, - _RedirectAgentTestsMixin, AgentTestsMixin): +class RedirectAgentTests(FakeReactorAndConnectMixin, + _RedirectAgentTestsMixin, AgentTestsMixin, + runtimeTestCase): """ Tests for L{client.RedirectAgent}. """ @@ -2937,7 +2994,9 @@ class RedirectAgentTests(TestCase, FakeR @return: a new L{twisted.web.client.RedirectAgent} """ return client.RedirectAgent( - self.buildAgentForWrapperTest(self.reactor)) + self.buildAgentForWrapperTest(self.reactor), + sensitiveHeaderNames=[b"X-Custom-sensitive"], + ) def setUp(self): @@ -2953,7 +3012,6 @@ class RedirectAgentTests(TestCase, FakeR """ self._testPageRedirectFailure(301, b'POST') - def test_302OnPost(self): """ When getting a 302 redirect on a I{POST} request, @@ -2963,11 +3021,9 @@ class RedirectAgentTests(TestCase, FakeR self._testPageRedirectFailure(302, b'POST') - -class BrowserLikeRedirectAgentTests(TestCase, - FakeReactorAndConnectMixin, - _RedirectAgentTestsMixin, - AgentTestsMixin): +class BrowserLikeRedirectAgentTests( + FakeReactorAndConnectMixin, _RedirectAgentTestsMixin, AgentTestsMixin, runtimeTestCase +): """ Tests for L{client.BrowserLikeRedirectAgent}. """ @@ -2976,7 +3032,9 @@ class BrowserLikeRedirectAgentTests(Test @return: a new L{twisted.web.client.BrowserLikeRedirectAgent} """ return client.BrowserLikeRedirectAgent( - self.buildAgentForWrapperTest(self.reactor)) + self.buildAgentForWrapperTest(self.reactor), + sensitiveHeaderNames=[b"x-Custom-sensitive"], + ) def setUp(self): --- a/src/twisted/web/test/test_http.py +++ b/src/twisted/web/test/test_http.py @@ -2154,9 +2154,9 @@ Hello, class QueryArgumentsTests(unittest.TestCase): def testParseqs(self): - self.assertEqual( - parse_qs(b"a=b&d=c;+=f"), - http.parse_qs(b"a=b&d=c;+=f")) + # self.assertEqual( + # parse_qs(b"a=b&d=c;+=f"), + # http.parse_qs(b"a=b&d=c;+=f")) self.assertRaises( ValueError, http.parse_qs, b"blah", strict_parsing=True) self.assertEqual(
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor