Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
SUSE:SLE-12-SP3:GA
python-Twisted.22825
CVE-2022-21712-sec-expo-CO-redirect.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File CVE-2022-21712-sec-expo-CO-redirect.patch of Package python-Twisted.22825
From eda4f1e2ec9988a142de244f1a2b285939718c03 Mon Sep 17 00:00:00 2001 From: Glyph <glyph@twistedmatrix.com> Date: Sun, 23 Jan 2022 12:57:49 -0800 Subject: [PATCH 01/10] failing test for header data leak --- twisted/newsfragments/10294.bugfix | 1 twisted/web/client.py | 66 ++++++++++- twisted/web/iweb.py | 10 - twisted/web/test/test_agent.py | 208 +++++++++++++++++++++++++++---------- 4 files changed, 219 insertions(+), 66 deletions(-) --- /dev/null +++ b/twisted/newsfragments/10294.bugfix @@ -0,0 +1 @@ +twisted.web.client.RedirectAgent and twisted.web.client.BrowserLikeRedirectAgent now properly remove sensitive headers when redirecting to a different origin. \ No newline at end of file --- a/twisted/web/client.py +++ b/twisted/web/client.py @@ -11,9 +11,10 @@ from __future__ import division, absolut import os import types import warnings +import zlib try: - from urlparse import urlunparse, urljoin, urldefrag + from urlparse import urldefrag, urljoin, urlunparse as _urlunparse from urllib import splithost, splittype except ImportError: from urllib.parse import splithost, splittype, urljoin, urldefrag @@ -23,19 +24,22 @@ except ImportError: result = _urlunparse(tuple([p.decode("charmap") for p in parts])) return result.encode("charmap") -import zlib from functools import wraps - from zope.interface import implementer +from typing import Iterable + from twisted.python.compat import _PY3, nativeString, intToBytes from twisted.python import log from twisted.python.failure import Failure -from twisted.python.deprecate import deprecatedModuleAttribute from twisted.python.versions import Version from twisted.web.iweb import IPolicyForHTTPS, IAgentEndpointFactory -from twisted.python.deprecate import getDeprecationWarningString +from twisted.python.deprecate import ( + deprecated, + deprecatedModuleAttribute, + getDeprecationWarningString +) from twisted.web import http from twisted.internet import defer, protocol, task, reactor from twisted.internet.interfaces import IProtocol @@ -43,8 +47,15 @@ from twisted.internet.endpoints import T from twisted.python.util import InsensitiveDict from twisted.python.components import proxyForInterface from twisted.web import error -from twisted.web.iweb import UNKNOWN_LENGTH, IAgent, IBodyProducer, IResponse from twisted.web.http_headers import Headers +from twisted.web.iweb import ( + UNKNOWN_LENGTH, + IAgent, + IAgentEndpointFactory, + IBodyProducer, + IPolicyForHTTPS, + IResponse, +) from twisted.web._newclient import _ensureValidURI, _ensureValidMethod @@ -1899,6 +1910,18 @@ class ContentDecoderAgent(object): +_canonicalHeaderName = Headers()._canonicalNameCaps +_defaultSensitiveHeaders = frozenset( + [ + b"Authorization", + b"Cookie", + b"Cookie2", + b"Proxy-Authorization", + b"WWW-Authenticate", + ] +) + + @implementer(IAgent) class RedirectAgent(object): """ @@ -1913,6 +1936,11 @@ class RedirectAgent(object): @param redirectLimit: The maximum number of times the agent is allowed to follow redirects before failing with a L{error.InfiniteRedirection}. + @param sensitiveHeaderNames: An iterable of C{bytes} enumerating the names + of headers that must not be transmitted when redirecting to a different + origins. These will be consulted in addition to the protocol-specified + set of headers that contain sensitive information. + @cvar _redirectResponses: A L{list} of HTTP status codes to be redirected for I{GET} and I{HEAD} methods. @@ -1927,9 +1955,17 @@ class RedirectAgent(object): _seeOtherResponses = [http.SEE_OTHER] - def __init__(self, agent, redirectLimit=20): + def __init__( + self, + agent: IAgent, + redirectLimit: int = 20, + sensitiveHeaderNames: Iterable[bytes] = (), + ): self._agent = agent self._redirectLimit = redirectLimit + sensitive = {_canonicalHeaderName(each) for each in sensitiveHeaderNames} + sensitive.update(_defaultSensitiveHeaders) + self._sensitiveHeaderNames = sensitive def request(self, method, uri, headers=None, bodyProducer=None): @@ -1976,6 +2012,22 @@ class RedirectAgent(object): response.code, 'No location header field', uri) raise ResponseFailed([Failure(err)], response) location = self._resolveLocation(uri, locationHeaders[0]) + if headers: + parsedURI = URI.fromBytes(uri) + parsedLocation = URI.fromBytes(location) + sameOrigin = ( + (parsedURI.scheme == parsedLocation.scheme) + and (parsedURI.host == parsedLocation.host) + and (parsedURI.port == parsedLocation.port) + ) + if not sameOrigin: + headers = Headers( + { + rawName: rawValue + for rawName, rawValue in headers.getAllRawHeaders() + if rawName not in self._sensitiveHeaderNames + } + ) deferred = self._agent.request(method, location, headers) def _chainResponse(newResponse): newResponse.setPreviousResponse(response) --- a/twisted/web/iweb.py +++ b/twisted/web/iweb.py @@ -675,12 +675,12 @@ class IAgent(Interface): obtained by combining a number of (hypothetical) implementations:: baseAgent = Agent(reactor) - redirect = BrowserLikeRedirectAgent(baseAgent, limit=10) + decode = ContentDecoderAgent(baseAgent, [(b"gzip", GzipDecoder())]) + cookie = CookieAgent(decode, diskStore.cookie) authenticate = AuthenticateAgent( - redirect, [diskStore.credentials, GtkAuthInterface()]) - cookie = CookieAgent(authenticate, diskStore.cookie) - decode = ContentDecoderAgent(cookie, [(b"gzip", GzipDecoder())]) - cache = CacheAgent(decode, diskStore.cache) + cookie, [diskStore.credentials, GtkAuthInterface()]) + cache = CacheAgent(authenticate, diskStore.cache) + redirect = BrowserLikeRedirectAgent(cache, limit=10) doSomeRequests(cache) """ --- a/twisted/web/test/test_agent.py +++ b/twisted/web/test/test_agent.py @@ -5,45 +5,86 @@ Tests for L{twisted.web.client.Agent} and related new client APIs. """ -import cookielib import zlib -from StringIO import StringIO +try: + from cookielib import CookieJar +except ImportError: + from http.cookiejar import CookieJar +from io import StringIO +from typing import TYPE_CHECKING, List, Optional, Tuple +from unittest import SkipTest +from zope.interface.declarations import implementer from zope.interface.verify import verifyObject +from incremental import Version + from twisted.trial.unittest import TestCase, SynchronousTestCase from twisted.web import client, error, http_headers -from twisted.web._newclient import RequestNotSent, RequestTransmissionFailed -from twisted.web._newclient import ResponseNeverReceived, ResponseFailed -from twisted.web._newclient import PotentialDataLoss +from twisted.web._newclient import ( + HTTP11ClientProtocol, + PotentialDataLoss, + RequestNotSent, + RequestTransmissionFailed, + Response, + ResponseFailed, + ResponseNeverReceived, +) +from twisted.web.client import ( + URI, + BrowserLikePolicyForHTTPS, + FileBodyProducer, + HostnameCachingHTTPSPolicy, + HTTPConnectionPool, + Request, + ResponseDone, + _HTTP11ClientFactory, +) from twisted.internet import defer, task from twisted.python.failure import Failure +from twisted.test.proto_helpers import ( + MemoryReactorClock, + StringTransport, +) from twisted.python.components import proxyForInterface -from twisted.test.proto_helpers import StringTransport, MemoryReactorClock from twisted.internet.task import Clock -from twisted.internet.error import ConnectionRefusedError, ConnectionDone -from twisted.internet.error import ConnectionLost +from twisted.internet.error import ( + ConnectionDone, + ConnectionLost, + ConnectionRefusedError, +) from twisted.internet.protocol import Protocol, Factory from twisted.internet.defer import Deferred, succeed, CancelledError from twisted.internet.endpoints import TCP4ClientEndpoint, SSL4ClientEndpoint -from twisted.web.client import (FileBodyProducer, Request, HTTPConnectionPool, - ResponseDone, _HTTP11ClientFactory, URI) - from twisted.web.iweb import ( - UNKNOWN_LENGTH, IAgent, IBodyProducer, IResponse, IAgentEndpointFactory, - ) + UNKNOWN_LENGTH, + IAgent, + IAgentEndpointFactory, + IBodyProducer, + IPolicyForHTTPS, + IResponse, +) from twisted.web.http_headers import Headers -from twisted.web._newclient import HTTP11ClientProtocol, Response from twisted.internet.interfaces import IOpenSSLClientConnectionCreator -from zope.interface.declarations import implementer -from twisted.web.iweb import IPolicyForHTTPS +from twisted.web.test.injectionhelpers import ( + MethodInjectionTestsMixin, + URIInjectionTestsMixin, +) + from twisted.python.deprecate import getDeprecationWarningString -from incremental import Version -from twisted.web.client import (BrowserLikePolicyForHTTPS) from twisted.web.error import SchemeNotSupported +# Creatively lie to mypy about the nature of inheritance, since dealing with +# expectations of a mixin class is basically impossible (don't use mixins). +if TYPE_CHECKING: + testMixinClass = TestCase + runtimeTestCase = object +else: + testMixinClass = object + runtimeTestCase = TestCase + try: from twisted.internet import ssl from twisted.protocols.tls import TLSMemoryBIOFactory, TLSMemoryBIOProtocol @@ -63,8 +104,8 @@ class StubHTTPProtocol(Protocol): tuple consisting of the request and the L{Deferred} returned from the request method is appended to this list. """ - def __init__(self): - self.requests = [] + def __init__(self) -> None: + self.requests: List[Tuple[Request, Deferred[IResponse]]] = [] self.state = 'QUIESCENT' @@ -1743,13 +1784,13 @@ class CookieJarTests(TestCase, CookieTes """ Tests for L{twisted.web.client._FakeUrllib2Response} and L{twisted.web.client._FakeUrllib2Request}'s interactions with - C{cookielib.CookieJar} instances. + C{CookieJar} instances. """ def makeCookieJar(self): """ - @return: a C{cookielib.CookieJar} with some sample cookies + @return: a C{CookieJar} with some sample cookies """ - cookieJar = cookielib.CookieJar() + cookieJar = CookieJar() reqres = self.addCookies( cookieJar, 'http://example.com:1234/foo?bar', @@ -1760,7 +1801,7 @@ class CookieJarTests(TestCase, CookieTes def test_extractCookies(self): """ - L{cookielib.CookieJar.extract_cookies} extracts cookie information from + L{CookieJar.extract_cookies} extracts cookie information from fake urllib2 response instances. """ jar = self.makeCookieJar()[0] @@ -1785,7 +1826,7 @@ class CookieJarTests(TestCase, CookieTes def test_sendCookie(self): """ - L{cookielib.CookieJar.add_cookie_header} adds a cookie header to a fake + L{CookieJar.add_cookie_header} adds a cookie header to a fake urllib2 request instance. """ jar, (request, response) = self.makeCookieJar() @@ -1812,7 +1853,7 @@ class CookieAgentTests(TestCase, CookieT """ return client.CookieAgent( self.buildAgentForWrapperTest(self.reactor), - cookielib.CookieJar()) + CookieJar()) def setUp(self): @@ -1826,7 +1867,7 @@ class CookieAgentTests(TestCase, CookieT being requested. Cookies are extracted from the response and stored in the cookie jar. """ - cookieJar = cookielib.CookieJar() + cookieJar = CookieJar() self.assertEqual(list(cookieJar), []) agent = self.buildAgentForWrapperTest(self.reactor) @@ -1865,7 +1906,7 @@ class CookieAgentTests(TestCase, CookieT uri = 'http://example.com:1234/foo?bar' cookie = 'foo=1' - cookieJar = cookielib.CookieJar() + cookieJar = CookieJar() self.addCookies(cookieJar, uri, [cookie]) self.assertEqual(len(list(cookieJar)), 1) @@ -1885,7 +1926,7 @@ class CookieAgentTests(TestCase, CookieT uri = 'https://example.com:1234/foo?bar' cookie = 'foo=1;secure' - cookieJar = cookielib.CookieJar() + cookieJar = CookieJar() self.addCookies(cookieJar, uri, [cookie]) self.assertEqual(len(list(cookieJar)), 1) @@ -1905,7 +1946,7 @@ class CookieAgentTests(TestCase, CookieT uri = 'http://example.com/foo?bar' cookie = 'foo=1;secure' - cookieJar = cookielib.CookieJar() + cookieJar = CookieJar() self.addCookies(cookieJar, uri, [cookie]) self.assertEqual(len(list(cookieJar)), 1) @@ -1925,7 +1966,7 @@ class CookieAgentTests(TestCase, CookieT uri = 'https://example.com:1234/foo?bar' cookie = 'foo=1;port=1234' - cookieJar = cookielib.CookieJar() + cookieJar = CookieJar() self.addCookies(cookieJar, uri, [cookie]) self.assertEqual(len(list(cookieJar)), 1) @@ -1945,7 +1986,7 @@ class CookieAgentTests(TestCase, CookieT uri = 'https://example.com:4567/foo?bar' cookie = 'foo=1;port=1234' - cookieJar = cookielib.CookieJar() + cookieJar = CookieJar() self.addCookies(cookieJar, uri, [cookie]) self.assertEqual(len(list(cookieJar)), 0) @@ -2400,11 +2441,29 @@ class ProxyAgentTests(TestCase, FakeReac -class _RedirectAgentTestsMixin(object): +SENSITIVE_HEADERS = [ + b"authorization", + b"cookie", + b"cookie2", + b"proxy-authorization", + b"www-authenticate", +] + +if TYPE_CHECKING: + testMixinClass = TestCase +else: + testMixinClass = object + + +class _RedirectAgentTestsMixin(testMixinClass): """ Test cases mixin for L{RedirectAgentTests} and L{BrowserLikeRedirectAgentTests}. """ + agent: IAgent + reactor: MemoryReactorClock + protocol: StubHTTPProtocol + def test_noRedirect(self): """ L{client.RedirectAgent} behaves like L{client.Agent} if the response @@ -2423,25 +2482,58 @@ class _RedirectAgentTestsMixin(object): self.assertIdentical(response, result) self.assertIdentical(result.previousResponse, None) - - def _testRedirectDefault(self, code): + def _testRedirectDefault( + self, + code: int, + crossScheme: bool = False, + crossDomain: bool = False, + crossPort: bool = False, + requestHeaders: Optional[Headers] = None, + ) -> Request: """ When getting a redirect, L{client.RedirectAgent} follows the URL specified in the L{Location} header field and make a new request. @param code: HTTP status code. """ - self.agent.request('GET', 'http://example.com/foo') + startDomain = b"example.com" + startScheme = b"https" if ssl is not None else b"http" + startPort = 80 if startScheme == b"http" else 443 + self.agent.request( + b"GET", startScheme + b"://" + startDomain + b"/foo", headers=requestHeaders + ) host, port = self.reactor.tcpClients.pop()[:2] self.assertEqual("example.com", host) - self.assertEqual(80, port) + self.assertEqual(startPort, port) req, res = self.protocol.requests.pop() - headers = http_headers.Headers( - {'location': ['https://example.com/bar']}) - response = Response(('HTTP', 1, 1), code, 'OK', headers, None) + # If possible (i.e.: TLS support is present), run the test with a + # cross-scheme redirect to verify that the scheme is honored; if not, + # let's just make sure it works at all. + + targetScheme = startScheme + targetDomain = startDomain + targetPort = startPort + + if crossScheme: + if ssl is None: + raise SkipTest( + "Cross-scheme redirects can't be tested without TLS support." + ) + targetScheme = b"https" if startScheme == b"http" else b"http" + targetPort = 443 if startPort == 80 else 80 + + portSyntax = b"" + if crossPort: + targetPort = 8443 + portSyntax = b":8443" + targetDomain = b"example.net" if crossDomain else startDomain + locationValue = targetScheme + b"://" + targetDomain + portSyntax + b"/bar" + headers = http_headers.Headers({b"location": [locationValue]} + ) + response = Response((b'HTTP', 1, 1), code, b'OK', headers, None) res.callback(response) req2, res2 = self.protocol.requests.pop() @@ -2449,9 +2541,9 @@ class _RedirectAgentTestsMixin(object): self.assertEqual('/bar', req2.uri) host, port = self.reactor.sslClients.pop()[:2] - self.assertEqual("example.com", host) - self.assertEqual(443, port) - + self.assertEqual(EXAMPLE_NET_IP if crossDomain else EXAMPLE_COM_IP, host) + self.assertEqual(targetPort, port) + return req2 def test_redirect301(self): """ @@ -2688,19 +2780,24 @@ class _RedirectAgentTestsMixin(object): self.assertIdentical(redirectResponse.previousResponse, None) - -class RedirectAgentTests(TestCase, FakeReactorAndConnectMixin, - _RedirectAgentTestsMixin, AgentTestsMixin): +class RedirectAgentTests( + FakeReactorAndConnectMixin, + _RedirectAgentTestsMixin, + AgentTestsMixin, + runtimeTestCase +): """ Tests for L{client.RedirectAgent}. """ + def makeAgent(self): """ @return: a new L{twisted.web.client.RedirectAgent} """ return client.RedirectAgent( - self.buildAgentForWrapperTest(self.reactor)) - + self.buildAgentForWrapperTest(self.reactor), + sensitiveHeaderNames=[b"X-Custom-sensitive"], + ) def setUp(self): self.reactor = self.Reactor() @@ -2725,11 +2822,12 @@ class RedirectAgentTests(TestCase, FakeR self._testPageRedirectFailure(302, 'POST') - -class BrowserLikeRedirectAgentTests(TestCase, - FakeReactorAndConnectMixin, - _RedirectAgentTestsMixin, - AgentTestsMixin): +class BrowserLikeRedirectAgentTests( + FakeReactorAndConnectMixin, + _RedirectAgentTestsMixin, + AgentTestsMixin, + runtimeTestCase, +): """ Tests for L{client.BrowserLikeRedirectAgent}. """ @@ -2738,7 +2836,9 @@ class BrowserLikeRedirectAgentTests(Test @return: a new L{twisted.web.client.BrowserLikeRedirectAgent} """ return client.BrowserLikeRedirectAgent( - self.buildAgentForWrapperTest(self.reactor)) + self.buildAgentForWrapperTest(self.reactor), + sensitiveHeaderNames=[b"x-Custom-sensitive"], + ) def setUp(self):
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor