Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
SUSE:SLE-15-SP1:Update
python-waitress
CVE-2022-24761.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File CVE-2022-24761.patch of Package python-waitress
Index: waitress-1.4.3/waitress/parser.py =================================================================== --- waitress-1.4.3.orig/waitress/parser.py +++ waitress-1.4.3/waitress/parser.py @@ -22,6 +22,7 @@ from io import BytesIO from waitress.buffers import OverflowableBuffer from waitress.compat import tostr, unquote_bytes_to_wsgi, urlparse from waitress.receiver import ChunkedReceiver, FixedStreamReceiver +from waitress.rfc7230 import HEADER_FIELD_RE, ONLY_DIGIT_RE from waitress.utilities import ( BadRequest, RequestEntityTooLarge, @@ -29,7 +30,6 @@ from waitress.utilities import ( ServerNotImplemented, find_double_newline, ) -from .rfc7230 import HEADER_FIELD class ParsingError(Exception): @@ -208,7 +208,7 @@ class HTTPRequestParser(object): headers = self.headers for line in lines: - header = HEADER_FIELD.match(line) + header = HEADER_FIELD_RE.match(line) if not header: raise ParsingError("Invalid header") @@ -298,11 +298,12 @@ class HTTPRequestParser(object): self.connection_close = True if not self.chunked: - try: - cl = int(headers.get("CONTENT_LENGTH", 0)) - except ValueError: + cl = headers.get("CONTENT_LENGTH", "0") + + if not ONLY_DIGIT_RE.match(cl.encode("latin-1")): raise ParsingError("Content-Length is invalid") + cl = int(cl) self.content_length = cl if cl > 0: buf = OverflowableBuffer(self.adj.inbuf_overflow) Index: waitress-1.4.3/waitress/receiver.py =================================================================== --- waitress-1.4.3.orig/waitress/receiver.py +++ waitress-1.4.3/waitress/receiver.py @@ -14,6 +14,7 @@ """Data Chunk Receiver """ +from waitress.rfc7230 import CHUNK_EXT_RE, ONLY_HEXDIG_RE from waitress.utilities import BadRequest, find_double_newline @@ -110,6 +111,7 @@ class ChunkedReceiver(object): s = b"" else: self.chunk_end = b"" + if pos == 0: # Chop off the terminating CR LF from the chunk s = s[2:] @@ -133,20 +135,32 @@ class ChunkedReceiver(object): line = s[:pos] s = s[pos + 2 :] self.control_line = b"" - line = line.strip() if line: # Begin a new chunk. semi = line.find(b";") if semi >= 0: - # discard extension info. + extinfo = line[semi:] + valid_ext_info = CHUNK_EXT_RE.match(extinfo) + + if not valid_ext_info: + self.error = BadRequest("Invalid chunk extension") + self.all_chunks_received = True + + break + line = line[:semi] - try: - sz = int(line.strip(), 16) # hexadecimal - except ValueError: # garbage in input - self.error = BadRequest("garbage in chunked encoding input") - sz = 0 + + if not ONLY_HEXDIG_RE.match(line): + self.error = BadRequest("Invalid chunk size") + self.all_chunks_received = True + + break + + # Can not fail due to matching against the regular + # expression above + sz = int(line, 16) # hexadecimal if sz > 0: # Start a new chunk. Index: waitress-1.4.3/waitress/rfc7230.py =================================================================== --- waitress-1.4.3.orig/waitress/rfc7230.py +++ waitress-1.4.3/waitress/rfc7230.py @@ -7,6 +7,9 @@ import re from .compat import tobytes +HEXDIG = "[0-9a-fA-F]" +DIGIT = "[0-9]" + WS = "[ \t]" OWS = WS + "{0,}?" RWS = WS + "{1,}?" @@ -27,6 +30,12 @@ TOKEN = TCHAR + "{1,}" # ; visible (printing) characters VCHAR = r"\x21-\x7e" +# The '\\' between \x5b and \x5d is needed to escape \x5d (']') +QDTEXT = "[\t \x21\x23-\x5b\\\x5d-\x7e" + OBS_TEXT + "]" + +QUOTED_PAIR = r"\\" + "([\t " + VCHAR + OBS_TEXT + "])" +QUOTED_STRING = '"(?:(?:' + QDTEXT + ")|(?:" + QUOTED_PAIR + '))*"' + # header-field = field-name ":" OWS field-value OWS # field-name = token # field-value = *( field-content / obs-fold ) @@ -45,8 +54,24 @@ FIELD_CONTENT = FIELD_VCHAR + "+(?:[ \t] # Which allows the field value here to just see if there is even a value in the first place FIELD_VALUE = "(?:" + FIELD_CONTENT + ")?" -HEADER_FIELD = re.compile( - tobytes( +# chunk-ext = *( ";" chunk-ext-name [ "=" chunk-ext-val ] +# chunk-ext-name = token +# chunk-ext-val = token / quoted-string + +CHUNK_EXT_NAME = TOKEN +CHUNK_EXT_VAL = "(?:" + TOKEN + ")|(?:" + QUOTED_STRING + ")" +CHUNK_EXT = ( + "(?:;(?P<extension>" + CHUNK_EXT_NAME + ")(?:=(?P<value>" + CHUNK_EXT_VAL + "))?)*" +) + +# Pre-compiled regular expressions for use elsewhere +ONLY_HEXDIG_RE = re.compile(("^" + HEXDIG + "+$").encode("latin-1")) +ONLY_DIGIT_RE = re.compile(("^" + DIGIT + "+$").encode("latin-1")) +HEADER_FIELD_RE = re.compile( + ( "^(?P<name>" + TOKEN + "):" + OWS + "(?P<value>" + FIELD_VALUE + ")" + OWS + "$" - ) + ).encode("latin-1") ) +QUOTED_PAIR_RE = re.compile(QUOTED_PAIR) +QUOTED_STRING_RE = re.compile(QUOTED_STRING) +CHUNK_EXT_RE = re.compile(("^" + CHUNK_EXT + "$").encode("latin-1")) Index: waitress-1.4.3/waitress/tests/test_functional.py =================================================================== --- waitress-1.4.3.orig/waitress/tests/test_functional.py +++ waitress-1.4.3/waitress/tests/test_functional.py @@ -301,7 +301,7 @@ class EchoTests(object): self.assertFalse("transfer-encoding" in headers) def test_chunking_request_with_content(self): - control_line = b"20;\r\n" # 20 hex = 32 dec + control_line = b"20\r\n" # 20 hex = 32 dec s = b"This string has 32 characters.\r\n" expected = s * 12 header = tobytes("GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n") @@ -320,7 +320,7 @@ class EchoTests(object): self.assertFalse("transfer-encoding" in headers) def test_broken_chunked_encoding(self): - control_line = "20;\r\n" # 20 hex = 32 dec + control_line = "20\r\n" # 20 hex = 32 dec s = "This string has 32 characters.\r\n" to_send = "GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n" to_send += control_line + s + "\r\n" @@ -343,8 +343,59 @@ class EchoTests(object): self.send_check_error(to_send) self.assertRaises(ConnectionClosed, read_http, fp) + def test_broken_chunked_encoding_invalid_hex(self): + control_line = b"0x20\r\n" # 20 hex = 32 dec + s = b"This string has 32 characters.\r\n" + to_send = b"GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n" + to_send += control_line + s + b"\r\n" + self.connect() + self.sock.send(to_send) + fp = self.sock.makefile("rb", 0) + + line, headers, response_body = read_http(fp) + self.assertline(line, "400", "Bad Request", "HTTP/1.1") + cl = int(headers["content-length"]) + self.assertEqual(cl, len(response_body)) + self.assertIn(b"Invalid chunk size", response_body) + self.assertEqual( + sorted(headers.keys()), + ["connection", "content-length", "content-type", "date", "server"], + ) + self.assertEqual(headers["content-type"], "text/plain") + # connection has been closed + self.send_check_error(to_send) + self.assertRaises(ConnectionClosed, read_http, fp) + + fp.close() + + def test_broken_chunked_encoding_invalid_extension(self): + control_line = b"20;invalid=\r\n" # 20 hex = 32 dec + s = b"This string has 32 characters.\r\n" + to_send = b"GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n" + to_send += control_line + s + b"\r\n" + self.connect() + self.sock.send(to_send) + + fp = self.sock.makefile("rb", 0) + + line, headers, response_body = read_http(fp) + self.assertline(line, "400", "Bad Request", "HTTP/1.1") + cl = int(headers["content-length"]) + self.assertEqual(cl, len(response_body)) + self.assertIn(b"Invalid chunk extension", response_body) + self.assertEqual( + sorted(headers.keys()), + ["connection", "content-length", "content-type", "date", "server"], + ) + self.assertEqual(headers["content-type"], "text/plain") + # connection has been closed + self.send_check_error(to_send) + self.assertRaises(ConnectionClosed, read_http, fp) + + fp.close() + def test_broken_chunked_encoding_missing_chunk_end(self): - control_line = "20;\r\n" # 20 hex = 32 dec + control_line = "20\r\n" # 20 hex = 32 dec s = "This string has 32 characters.\r\n" to_send = "GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n" to_send += control_line + s Index: waitress-1.4.3/waitress/tests/test_parser.py =================================================================== --- waitress-1.4.3.orig/waitress/tests/test_parser.py +++ waitress-1.4.3/waitress/tests/test_parser.py @@ -152,7 +152,7 @@ class TestHTTPRequestParser(unittest.Tes b"Transfer-Encoding: chunked\r\n" b"X-Foo: 1\r\n" b"\r\n" - b"1d;\r\n" + b"1d\r\n" b"This string has 29 characters\r\n" b"0\r\n\r\n" ) @@ -182,6 +182,31 @@ class TestHTTPRequestParser(unittest.Tes else: # pragma: nocover self.assertTrue(False) + def test_parse_header_bad_content_length_plus(self): + from waitress.parser import ParsingError + + data = b"GET /foobar HTTP/8.4\r\ncontent-length: +10\r\n" + + try: + self.parser.parse_header(data) + except ParsingError as e: + self.assertIn("Content-Length is invalid", e.args[0]) + else: # pragma: nocover + self.assertTrue(False) + + def test_parse_header_bad_content_length_minus(self): + from waitress.parser import ParsingError + + data = b"GET /foobar HTTP/8.4\r\ncontent-length: -10\r\n" + + try: + self.parser.parse_header(data) + except ParsingError as e: + self.assertIn("Content-Length is invalid", e.args[0]) + else: # pragma: nocover + self.assertTrue(False) + + def test_parse_header_bad_content_length(self): from waitress.parser import ParsingError Index: waitress-1.4.3/waitress/tests/test_receiver.py =================================================================== --- waitress-1.4.3.orig/waitress/tests/test_receiver.py +++ waitress-1.4.3/waitress/tests/test_receiver.py @@ -1,5 +1,7 @@ import unittest +import pytest + class TestFixedStreamReceiver(unittest.TestCase): def _makeOne(self, cl, buf): @@ -48,6 +50,65 @@ class TestFixedStreamReceiver(unittest.T inst = self._makeOne(10, buf) self.assertEqual(inst.__len__(), 2) +class TestChunkedReceiverParametrized: + def _makeOne(self, buf): + from waitress.receiver import ChunkedReceiver + + return ChunkedReceiver(buf) + + @pytest.mark.parametrize( + "invalid_extension", [b"\n", b"invalid=", b"\r", b"invalid = true"] + ) + def test_received_invalid_extensions(self, invalid_extension): + from waitress.utilities import BadRequest + + buf = DummyBuffer() + inst = self._makeOne(buf) + data = b"4;" + invalid_extension + b"\r\ntest\r\n" + result = inst.received(data) + assert result == len(data) + assert inst.error.__class__ == BadRequest + assert inst.error.body == "Invalid chunk extension" + + @pytest.mark.parametrize( + "valid_extension", [b"test", b"valid=true", b"valid=true;other=true"] + ) + def test_received_valid_extensions(self, valid_extension): + from waitress.utilities import BadRequest + + buf = DummyBuffer() + inst = self._makeOne(buf) + data = b"4;" + invalid_extension + b"\r\ntest\r\n" + result = inst.received(data) + assert result == len(data) + assert inst.error.__class__ == BadRequest + assert inst.error.body == "Invalid chunk extension" + + @pytest.mark.parametrize( + "valid_extension", [b"test", b"valid=true", b"valid=true;other=true"] + ) + def test_received_valid_extensions(self, valid_extension): + # While waitress may ignore extensions in Chunked Encoding, we do want + # to make sure that we don't fail when we do encounter one that is + # valid + buf = DummBuffer() + inst = self._makeOne(buf) + data = b"4;" + valid_extension + b"\r\ntest\r\n" + result = inst.received(data) + assert result == len(data) + assert inst.error == None + + @pytest.mark.parametrize( + "invalid_size", [b"0x04", b"+0x04", b"x04", b"+04", b" 04", b" 0x04"] + ) + def test_received_invalid_size(self, invalid_size): + from waitress.utilities import BadReques + + buf = DummyBuffer() + inst = self._makeOne(buf) + data = invalid_size + b"\r\ntest\r\n" + result = inst.received(data) + assert result == len(data) class TestChunkedReceiver(unittest.TestCase): def _makeOne(self, buf): Index: waitress-1.4.3/waitress/utilities.py =================================================================== --- waitress-1.4.3.orig/waitress/utilities.py +++ waitress-1.4.3/waitress/utilities.py @@ -22,7 +22,7 @@ import re import stat import time -from .rfc7230 import OBS_TEXT, VCHAR +from .rfc7230 import QUOTED_PAIR_RE, QUOTED_STRING_RE logger = logging.getLogger("waitress") queue_logger = logging.getLogger("waitress.queue") @@ -216,32 +216,10 @@ def parse_http_date(d): return retval -# RFC 5234 Appendix B.1 "Core Rules": -# VCHAR = %x21-7E -# ; visible (printing) characters -vchar_re = VCHAR - -# RFC 7230 Section 3.2.6 "Field Value Components": -# quoted-string = DQUOTE *( qdtext / quoted-pair ) DQUOTE -# qdtext = HTAB / SP /%x21 / %x23-5B / %x5D-7E / obs-text -# obs-text = %x80-FF -# quoted-pair = "\" ( HTAB / SP / VCHAR / obs-text ) -obs_text_re = OBS_TEXT - -# The '\\' between \x5b and \x5d is needed to escape \x5d (']') -qdtext_re = "[\t \x21\x23-\x5b\\\x5d-\x7e" + obs_text_re + "]" - -quoted_pair_re = r"\\" + "([\t " + vchar_re + obs_text_re + "])" -quoted_string_re = '"(?:(?:' + qdtext_re + ")|(?:" + quoted_pair_re + '))*"' - -quoted_string = re.compile(quoted_string_re) -quoted_pair = re.compile(quoted_pair_re) - - def undquote(value): if value.startswith('"') and value.endswith('"'): # So it claims to be DQUOTE'ed, let's validate that - matches = quoted_string.match(value) + matches = QUOTED_STRING_RE.match(value) if matches and matches.end() == len(value): # Remove the DQUOTE's from the value @@ -249,7 +227,7 @@ def undquote(value): # Remove all backslashes that are followed by a valid vchar or # obs-text - value = quoted_pair.sub(r"\1", value) + value = QUOTED_PAIR_RE.sub(r"\1", value) return value elif not value.startswith('"') and not value.endswith('"'):
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor