Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Lib/http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@

DEFAULT_ERROR_CONTENT_TYPE = "text/html;charset=utf-8"

def _validate_header_string(value):
"""Validate header values preventing CRLF injection."""
if isinstance(value, str) and ('\r' in value or '\n' in value):
raise ValueError('Invalid header name/value: contains CR or LF')

class HTTPServer(socketserver.TCPServer):

allow_reuse_address = True # Seems to make sense in testing environment
Expand Down Expand Up @@ -547,12 +552,15 @@ def send_response_only(self, code, message=None):
message = ''
if not hasattr(self, '_headers_buffer'):
self._headers_buffer = []
_validate_header_string(message)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validating a standard is redundant, isn't? Do this only for user specified message.

self._headers_buffer.append(("%s %d %s\r\n" %
(self.protocol_version, code, message)).encode(
'latin-1', 'strict'))

def send_header(self, keyword, value):
"""Send a MIME header to the headers buffer."""
_validate_header_string(keyword)
_validate_header_string(value)
if self.request_version != 'HTTP/0.9':
if not hasattr(self, '_headers_buffer'):
self._headers_buffer = []
Expand Down
39 changes: 39 additions & 0 deletions Lib/test/test_httpservers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,19 @@ def test_header_buffering_of_send_response_only(self):
handler.end_headers()
self.assertEqual(output.numWrites, 1)

def test_send_response_only_rejects_crlf_message(self):
input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
output = AuditableBytesIO()
handler = SocketlessRequestHandler()
handler.rfile = input
handler.wfile = output
handler.request_version = 'HTTP/1.1'

with self.assertRaises(ValueError) as ctx:
handler.send_response_only(418, 'value\r\nSet-Cookie: custom=true')
self.assertIn('Invalid header name/value: contains CR or LF',
str(ctx.exception))

def test_header_buffering_of_send_header(self):

input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
Expand All @@ -1068,6 +1081,32 @@ def test_header_buffering_of_send_header(self):
self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n')
self.assertEqual(output.numWrites, 1)

def test_crlf_injection_in_header_value(self):
input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
output = AuditableBytesIO()
handler = SocketlessRequestHandler()
handler.rfile = input
handler.wfile = output
handler.request_version = 'HTTP/1.1'

with self.assertRaises(ValueError) as ctx:
handler.send_header('X-Custom', 'value\r\nSet-Cookie: custom=true')
self.assertIn('Invalid header name/value: contains CR or LF',
str(ctx.exception))

def test_crlf_injection_in_header_name(self):
input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
output = AuditableBytesIO()
handler = SocketlessRequestHandler()
handler.rfile = input
handler.wfile = output
handler.request_version = 'HTTP/1.1'

with self.assertRaises(ValueError) as ctx:
handler.send_header('X-Inj\r\nSet-Cookie', 'value')
self.assertIn('Invalid header name/value: contains CR or LF',
str(ctx.exception))

def test_header_unbuffered_when_continue(self):

def _readAndReseek(f):
Expand Down
33 changes: 33 additions & 0 deletions Lib/test/test_wsgiref.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,39 @@ def testExtras(self):
'\r\n'
)

def test_crlf_rejection_in_setitem(self):
h = Headers()
for crlf in ('\r\n', '\r', '\n'):
with self.subTest(crlf_repr=repr(crlf)):
with self.assertRaises(ValueError) as ctx:
h['X-Custom'] = f'value{crlf}Set-Cookie: test'
self.assertIn('CR or LF', str(ctx.exception))

def test_crlf_rejection_in_setdefault(self):
for crlf in ('\r\n', '\r', '\n'):
with self.subTest(crlf_repr=repr(crlf)):
h = Headers()
with self.assertRaises(ValueError) as ctx:
h.setdefault('X-Custom', f'value{crlf}Set-Cookie: test')
self.assertIn('CR or LF', str(ctx.exception))

def test_crlf_rejection_in_add_header(self):
for crlf in ('\r\n', '\r', '\n'):
with self.subTest(location='value', crlf_repr=repr(crlf)):
h = Headers()
with self.assertRaises(ValueError) as ctx:
h.add_header('X-Custom', f'value{crlf}Set-Cookie: test')
self.assertIn('CR or LF', str(ctx.exception))

with self.subTest(location='param', crlf_repr=repr(crlf)):
h = Headers()
with self.assertRaises(ValueError) as ctx:
h.add_header('Content-Disposition',
'attachment',
filename=f'test{crlf}.txt')
self.assertIn('CR or LF', str(ctx.exception))


class ErrorHandler(BaseCGIHandler):
"""Simple handler subclass for testing BaseHandler"""

Expand Down
31 changes: 17 additions & 14 deletions Lib/wsgiref/headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ def __init__(self, headers=None):
self._headers = headers
if __debug__:
for k, v in headers:
self._convert_string_type(k)
self._convert_string_type(v)
self._validate_header_string(k)
self._validate_header_string(v)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is debug-only check. It can be completely omitted.


def _convert_string_type(self, value):
"""Convert/check value type."""
def _validate_header_string(self, value):
"""Validate header type and value."""
if type(value) is str:
if '\r' in value or '\n' in value:
raise ValueError('Invalid header name/value: contains CR or LF')
return value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you changed the method name, backward compatibility is not a concern. No need to return the value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This required a small refactor of the function code (i.e. no more early return) and a tiny change of all usages that relied on the return value

raise AssertionError("Header names/values must be"
" of type str (got {0})".format(repr(value)))
Expand All @@ -53,14 +55,15 @@ def __setitem__(self, name, val):
"""Set the value of a header."""
del self[name]
self._headers.append(
(self._convert_string_type(name), self._convert_string_type(val)))
(self._validate_header_string(name),
self._validate_header_string(val)))

def __delitem__(self,name):
"""Delete all occurrences of a header, if present.

Does *not* raise an exception if the header is missing.
"""
name = self._convert_string_type(name.lower())
name = self._validate_header_string(name.lower())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially this code was added to handle errors of misusing 8-bit strings and unicode strings. But now we have a single string type, and bytes cannot slip through so easy -- they will cause other errors.

self._headers[:] = [kv for kv in self._headers if kv[0].lower() != name]

def __getitem__(self,name):
Expand All @@ -87,13 +90,13 @@ def get_all(self, name):
fields deleted and re-inserted are always appended to the header list.
If no fields exist with the given name, returns an empty list.
"""
name = self._convert_string_type(name.lower())
name = self._validate_header_string(name.lower())
return [kv[1] for kv in self._headers if kv[0].lower()==name]


def get(self,name,default=None):
"""Get the first header value for 'name', or return 'default'"""
name = self._convert_string_type(name.lower())
name = self._validate_header_string(name.lower())
for k,v in self._headers:
if k.lower()==name:
return v
Expand Down Expand Up @@ -148,8 +151,8 @@ def setdefault(self,name,value):
and value 'value'."""
result = self.get(name)
if result is None:
self._headers.append((self._convert_string_type(name),
self._convert_string_type(value)))
self._headers.append((self._validate_header_string(name),
self._validate_header_string(value)))
return value
else:
return result
Expand All @@ -172,13 +175,13 @@ def add_header(self, _name, _value, **_params):
"""
parts = []
if _value is not None:
_value = self._convert_string_type(_value)
_value = self._validate_header_string(_value)
parts.append(_value)
for k, v in _params.items():
k = self._convert_string_type(k)
k = self._validate_header_string(k)
if v is None:
parts.append(k.replace('_', '-'))
else:
v = self._convert_string_type(v)
v = self._validate_header_string(v)
parts.append(_formatparam(k.replace('_', '-'), v))
self._headers.append((self._convert_string_type(_name), "; ".join(parts)))
self._headers.append((self._validate_header_string(_name), "; ".join(parts)))
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Reject CR/LF in HTTP headers in ``http.server`` and ``wsgiref.headers`` to
prevent CRLF injection attacks.
Loading