Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:mnhauke:fedmsg
python-txWS
tests.py
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File tests.py of Package python-txWS
# Copyright 2014 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy # of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. import six from twisted.trial import unittest from txws import (is_hybi00, complete_hybi00, make_hybi00_frame, parse_hybi00_frames, http_headers, make_accept, mask, CLOSE, NORMAL, PING, PONG, parse_hybi07_frames) class TestHTTPHeaders(unittest.TestCase): def test_single_header(self): raw = "Connection: Upgrade" headers = http_headers(raw) self.assertTrue("Connection" in headers) self.assertEqual(headers["Connection"], "Upgrade") def test_single_header_newline(self): raw = "Connection: Upgrade\r\n" headers = http_headers(raw) self.assertEqual(headers["Connection"], "Upgrade") def test_multiple_headers(self): raw = "Connection: Upgrade\r\nUpgrade: WebSocket" headers = http_headers(raw) self.assertEqual(headers["Connection"], "Upgrade") self.assertEqual(headers["Upgrade"], "WebSocket") def test_origin_colon(self): """ Some headers have multiple colons in them. """ raw = "Origin: http://example.com:8080" headers = http_headers(raw) self.assertEqual(headers["Origin"], "http://example.com:8080") class TestKeys(unittest.TestCase): def test_make_accept_rfc(self): """ Test ``make_accept()`` using the keys listed in the RFC for HyBi-07 through HyBi-10. """ key = "dGhlIHNhbXBsZSBub25jZQ==" self.assertEqual(make_accept(key), "s3pPLMBiTxaQ9kYGzzhZRbK+xOo=") def test_make_accept_wikipedia(self): """ Test ``make_accept()`` using the keys listed on Wikipedia. """ key = "x3JJHMbDL1EzLkh9GBhXDw==" self.assertEqual(make_accept(key), "HSmrc0sMlYUkAGmm5OPpG2HaGWk=") class TestHyBi00(unittest.TestCase): def test_is_hybi00(self): headers = { "Sec-WebSocket-Key1": "hurp", "Sec-WebSocket-Key2": "derp", } self.assertTrue(is_hybi00(headers)) def test_is_hybi00_false(self): headers = { "Sec-WebSocket-Key1": "hurp", } self.assertFalse(is_hybi00(headers)) def test_complete_hybi00_wikipedia(self): """ Test complete_hybi00() using the keys listed on Wikipedia's WebSockets page. """ headers = { "Sec-WebSocket-Key1": "4 @1 46546xW%0l 1 5", "Sec-WebSocket-Key2": "12998 5 Y3 1 .P00", } challenge = "^n:ds[4U" self.assertEqual(complete_hybi00(headers, challenge), six.b("8jKS'y:G*Co,Wxa-")) def test_make_hybi00(self): """ HyBi-00 frames are really, *really* simple. """ self.assertEqual(make_hybi00_frame("Test!"), six.b("\x00Test!\xff")) def test_parse_hybi00_single(self): frame = six.b("\x00Test\xff") frames, buf = parse_hybi00_frames(frame) self.assertEqual(len(frames), 1) self.assertEqual(frames[0], (NORMAL, six.b("Test"))) self.assertEqual(buf, six.b("")) def test_parse_hybi00_multiple(self): frame = six.b("\x00Test\xff\x00Again\xff") frames, buf = parse_hybi00_frames(frame) self.assertEqual(len(frames), 2) self.assertEqual(frames[0], (NORMAL, six.b("Test"))) self.assertEqual(frames[1], (NORMAL, six.b("Again"))) self.assertEqual(buf, six.b("")) def test_parse_hybi00_incomplete(self): frame = six.b("\x00Test") frames, buf = parse_hybi00_frames(frame) self.assertEqual(len(frames), 0) self.assertEqual(buf, six.b("\x00Test")) def test_parse_hybi00_garbage(self): frame = six.b("trash\x00Test\xff") frames, buf = parse_hybi00_frames(frame) self.assertEqual(len(frames), 1) self.assertEqual(frames[0], (NORMAL, six.b("Test"))) self.assertEqual(buf, six.b("")) def test_socketio_crashers(self): """ A series of snippets which crash other WebSockets implementations (specifically, Socket.IO) are harmless to this implementation. """ frames = [ """[{"length":1}]""", """{"messages":[{"length":1}]}""", "hello", "hello<script>alert(/xss/)</script>", "hello<img src=x:x onerror=alert(/xss.2/)>", "{", "~m~EVJLFDJP~", ] for frame in frames: prepared = make_hybi00_frame(frame) frames, buf = parse_hybi00_frames(prepared) self.assertEqual(len(frames), 1) self.assertEqual(frames[0], (NORMAL, frame.encode('utf-8'))) self.assertEqual(buf, six.b("")) class TestHyBi07Helpers(unittest.TestCase): """ HyBi-07 is best understood as a large family of helper functions which work together, somewhat dysfunctionally, to produce a mediocre Thanksgiving every other year. """ def test_mask_noop(self): key = six.b("\x00\x00\x00\x00") self.assertEqual(mask(six.b("Test"), key), six.b("Test")) def test_mask_noop_long(self): key = six.b("\x00\x00\x00\x00") self.assertEqual(mask(six.b("LongTest"), key), six.b("LongTest")) def test_parse_hybi07_unmasked_text(self): """ From HyBi-10, 4.7. """ frame = six.b("\x81\x05Hello") frames, buf = parse_hybi07_frames(frame) self.assertEqual(len(frames), 1) self.assertEqual(frames[0], (NORMAL, six.b("Hello"))) self.assertEqual(buf, six.b("")) def test_parse_hybi07_masked_text(self): """ From HyBi-10, 4.7. """ frame = six.b("\x81\x857\xfa!=\x7f\x9fMQX") frames, buf = parse_hybi07_frames(frame) self.assertEqual(len(frames), 1) self.assertEqual(frames[0], (NORMAL, six.b("Hello"))) self.assertEqual(buf, six.b("")) def test_parse_hybi07_unmasked_text_fragments(self): """ We don't care about fragments. We are totally unfazed. From HyBi-10, 4.7. """ frame = six.b("\x01\x03Hel\x80\x02lo") frames, buf = parse_hybi07_frames(frame) self.assertEqual(len(frames), 2) self.assertEqual(frames[0], (NORMAL, six.b("Hel"))) self.assertEqual(frames[1], (NORMAL, six.b("lo"))) self.assertEqual(buf, six.b("")) def test_parse_hybi07_ping(self): """ From HyBi-10, 4.7. """ frame = six.b("\x89\x05Hello") frames, buf = parse_hybi07_frames(frame) self.assertEqual(len(frames), 1) self.assertEqual(frames[0], (PING, six.b("Hello"))) self.assertEqual(buf, six.b("")) def test_parse_hybi07_pong(self): """ From HyBi-10, 4.7. """ frame = six.b("\x8a\x05Hello") frames, buf = parse_hybi07_frames(frame) self.assertEqual(len(frames), 1) self.assertEqual(frames[0], (PONG, six.b("Hello"))) self.assertEqual(buf, six.b("")) def test_parse_hybi07_close_empty(self): """ A HyBi-07 close packet may have no body. In that case, it should use the generic error code 1000, and have no reason. """ frame = six.b("\x88\x00") frames, buf = parse_hybi07_frames(frame) self.assertEqual(len(frames), 1) self.assertEqual(frames[0], (CLOSE, (1000, six.b("No reason given")))) self.assertEqual(buf, six.b("")) def test_parse_hybi07_close_reason(self): """ A HyBi-07 close packet must have its first two bytes be a numeric error code, and may optionally include trailing text explaining why the connection was closed. """ frame = six.b("\x88\x0b\x03\xe8No reason") frames, buf = parse_hybi07_frames(frame) self.assertEqual(len(frames), 1) self.assertEqual(frames[0], (CLOSE, (1000, six.b("No reason")))) self.assertEqual(buf, six.b("")) def test_parse_hybi07_partial_no_length(self): frame = six.b("\x81") frames, buf = parse_hybi07_frames(frame) self.assertFalse(frames) self.assertEqual(buf, six.b("\x81")) def test_parse_hybi07_partial_truncated_length_int(self): frame = six.b("\x81\xfe") frames, buf = parse_hybi07_frames(frame) self.assertFalse(frames) self.assertEqual(buf, six.b("\x81\xfe")) def test_parse_hybi07_partial_truncated_length_double(self): frame = six.b("\x81\xff") frames, buf = parse_hybi07_frames(frame) self.assertFalse(frames) self.assertEqual(buf, six.b("\x81\xff")) def test_parse_hybi07_partial_no_data(self): frame = six.b("\x81\x05") frames, buf = parse_hybi07_frames(frame) self.assertFalse(frames) self.assertEqual(buf, six.b("\x81\x05")) def test_parse_hybi07_partial_truncated_data(self): frame = six.b("\x81\x05Hel") frames, buf = parse_hybi07_frames(frame) self.assertFalse(frames) self.assertEqual(buf, six.b("\x81\x05Hel"))
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor