Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
openSUSE:Backports:SLE-15-SP4:FactoryCandidates
python-apns2
use-httpx.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File use-httpx.patch of Package python-apns2
From 64d8465a4ebb7af3b7be553fb5bbfde11935a77e Mon Sep 17 00:00:00 2001 From: Simon Morgan <smorgan@brainomix.com> Date: Thu, 4 Aug 2022 12:32:21 +0100 Subject: [PATCH 1/2] Es 7755 (#1) * ES-7755 Simple HTTPX implementation * ES-7755 Add http2 extras to httpx --- apns2/client.py | 181 ++++++++++--------------------------------- apns2/credentials.py | 26 +------ pyproject.toml | 4 +- 3 files changed, 45 insertions(+), 166 deletions(-) Index: apns2-0.7.2/apns2/client.py =================================================================== --- apns2-0.7.2.orig/apns2/client.py +++ apns2-0.7.2/apns2/client.py @@ -1,15 +1,12 @@ import collections +import httpx import json import logging -import time -import typing -import weakref from enum import Enum -from threading import Thread from typing import Dict, Iterable, Optional, Tuple, Union -from .credentials import CertificateCredentials, Credentials -from .errors import ConnectionFailed, exception_class_for_reason +from .credentials import Credentials, CertificateCredentials, TokenCredentials +from .errors import exception_class_for_reason # We don't generally need to know about the Credentials subclasses except to # keep the old API, where APNsClient took a cert_file from .payload import Payload @@ -29,7 +26,7 @@ class NotificationType(Enum): MDM = 'mdm' -RequestStream = collections.namedtuple('RequestStream', ['stream_id', 'token']) +RequestStream = collections.namedtuple('RequestStream', ['token', 'status', 'reason']) Notification = collections.namedtuple('Notification', ['token', 'payload']) DEFAULT_APNS_PRIORITY = NotificationPriority.Immediate @@ -52,57 +49,39 @@ class APNsClient(object): json_encoder: Optional[type] = None, password: Optional[str] = None, proxy_host: Optional[str] = None, proxy_port: Optional[int] = None, heartbeat_period: Optional[float] = None) -> None: + if isinstance(credentials, str): - self.__credentials = CertificateCredentials(credentials, password) # type: Credentials + self.__credentials = CertificateCredentials(credentials, password) else: self.__credentials = credentials + self._init_connection(use_sandbox, use_alternative_port, proto, proxy_host, proxy_port) if heartbeat_period: - self._start_heartbeat(heartbeat_period) + raise NotImplementedError("heartbeat not supported") self.__json_encoder = json_encoder - self.__max_concurrent_streams = 0 - self.__previous_server_max_concurrent_streams = None def _init_connection(self, use_sandbox: bool, use_alternative_port: bool, proto: Optional[str], proxy_host: Optional[str], proxy_port: Optional[int]) -> None: - server = self.SANDBOX_SERVER if use_sandbox else self.LIVE_SERVER - port = self.ALTERNATIVE_PORT if use_alternative_port else self.DEFAULT_PORT - self._connection = self.__credentials.create_connection(server, port, proto, proxy_host, proxy_port) - - def _start_heartbeat(self, heartbeat_period: float) -> None: - conn_ref = weakref.ref(self._connection) - - def watchdog() -> None: - while True: - conn = conn_ref() - if conn is None: - break - - conn.ping('-' * 8) - time.sleep(heartbeat_period) - - thread = Thread(target=watchdog) - thread.setDaemon(True) - thread.start() + self.__server = self.SANDBOX_SERVER if use_sandbox else self.LIVE_SERVER + self.__port = self.ALTERNATIVE_PORT if use_alternative_port else self.DEFAULT_PORT def send_notification(self, token_hex: str, notification: Payload, topic: Optional[str] = None, priority: NotificationPriority = NotificationPriority.Immediate, expiration: Optional[int] = None, collapse_id: Optional[str] = None) -> None: - stream_id = self.send_notification_async(token_hex, notification, topic, priority, expiration, collapse_id) - result = self.get_notification_result(stream_id) - if result != 'Success': - if isinstance(result, tuple): - reason, info = result - raise exception_class_for_reason(reason)(info) - else: - raise exception_class_for_reason(result) - - def send_notification_async(self, token_hex: str, notification: Payload, topic: Optional[str] = None, - priority: NotificationPriority = NotificationPriority.Immediate, - expiration: Optional[int] = None, collapse_id: Optional[str] = None, - push_type: Optional[NotificationType] = None) -> int: + with httpx.Client(http2=True) as client: + status, reason = self.send_notification_sync(token_hex, notification, client, topic, priority, expiration, + collapse_id) + + if status != 200: + raise exception_class_for_reason(reason) + + def send_notification_sync(self, token_hex: str, notification: Payload, client: httpx.Client, + topic: Optional[str] = None, + priority: NotificationPriority = NotificationPriority.Immediate, + expiration: Optional[int] = None, collapse_id: Optional[str] = None, + push_type: Optional[NotificationType] = None) -> int: json_str = json.dumps(notification.dict(), cls=self.__json_encoder, ensure_ascii=False, separators=(',', ':')) json_payload = json_str.encode('utf-8') @@ -138,128 +117,57 @@ class APNsClient(object): if expiration is not None: headers['apns-expiration'] = '%d' % expiration - auth_header = self.__credentials.get_authorization_header(topic) - if auth_header is not None: - headers['authorization'] = auth_header + if isinstance(self.__credentials, TokenCredentials): + auth_header = self.__credentials.get_authorization_header(topic) + if auth_header is not None: + headers['authorization'] = auth_header if collapse_id is not None: headers['apns-collapse-id'] = collapse_id - url = '/3/device/{}'.format(token_hex) - stream_id = self._connection.request('POST', url, json_payload, headers) # type: int - return stream_id + url = f'https://{self.__server}:{self.__port}/3/device/{token_hex}' + response = client.post(url, headers=headers, data=json_payload) + return response.status_code, response.text - def get_notification_result(self, stream_id: int) -> Union[str, Tuple[str, str]]: + def get_notification_result(self, status: int, reason: str) -> Union[str, Tuple[str, str]]: """ Get result for specified stream - The function returns: 'Success' or 'failure reason' or ('Unregistered', timestamp) + The function returns: 'Success' or 'failure reason' """ - with self._connection.get_response(stream_id) as response: - if response.status == 200: - return 'Success' - else: - raw_data = response.read().decode('utf-8') - data = json.loads(raw_data) # type: Dict[str, str] - if response.status == 410: - return data['reason'], data['timestamp'] - else: - return data['reason'] + if status == 200: + return 'Success' + else: + return reason def send_notification_batch(self, notifications: Iterable[Notification], topic: Optional[str] = None, priority: NotificationPriority = NotificationPriority.Immediate, expiration: Optional[int] = None, collapse_id: Optional[str] = None, push_type: Optional[NotificationType] = None) -> Dict[str, Union[str, Tuple[str, str]]]: """ - Send a notification to a list of tokens in batch. Instead of sending a synchronous request - for each token, send multiple requests concurrently. This is done on the same connection, - using HTTP/2 streams (one request per stream). - - APNs allows many streams simultaneously, but the number of streams can vary depending on - server load. This method reads the SETTINGS frame sent by the server to figure out the - maximum number of concurrent streams. Typically, APNs reports a maximum of 500. + Send a notification to a list of tokens in batch. The function returns a dictionary mapping each token to its result. The result is "Success" if the token was sent successfully, or the string returned by APNs in the 'reason' field of the response, if the token generated an error. """ - notification_iterator = iter(notifications) - next_notification = next(notification_iterator, None) - # Make sure we're connected to APNs, so that we receive and process the server's SETTINGS - # frame before starting to send notifications. - self.connect() - results = {} - open_streams = collections.deque() # type: typing.Deque[RequestStream] - # Loop on the tokens, sending as many requests as possible concurrently to APNs. - # When reaching the maximum concurrent streams limit, wait for a response before sending - # another request. - while len(open_streams) > 0 or next_notification is not None: - # Update the max_concurrent_streams on every iteration since a SETTINGS frame can be - # sent by the server at any time. - self.update_max_concurrent_streams() - if next_notification is not None and len(open_streams) < self.__max_concurrent_streams: + + # Loop over notifications + with httpx.Client(http2=True, verify=self.__credentials.ssl_context) as client: + for next_notification in notifications: logger.info('Sending to token %s', next_notification.token) - stream_id = self.send_notification_async(next_notification.token, next_notification.payload, topic, - priority, expiration, collapse_id, push_type) - open_streams.append(RequestStream(stream_id, next_notification.token)) - - next_notification = next(notification_iterator, None) - if next_notification is None: - # No tokens remaining. Proceed to get results for pending requests. - logger.info('Finished sending all tokens, waiting for pending requests.') - else: - # We have at least one request waiting for response (otherwise we would have either - # sent new requests or exited the while loop.) Wait for the first outstanding stream - # to return a response. - pending_stream = open_streams.popleft() - result = self.get_notification_result(pending_stream.stream_id) - logger.info('Got response for %s: %s', pending_stream.token, result) - results[pending_stream.token] = result + status, reason = self.send_notification_sync(next_notification.token, next_notification.payload, client, + topic, priority, expiration, collapse_id, push_type) + result = self.get_notification_result(status, reason) + logger.info('Got response for %s: %s', next_notification.token, result) + results[next_notification.token] = result return results - def update_max_concurrent_streams(self) -> None: - # Get the max_concurrent_streams setting returned by the server. - # The max_concurrent_streams value is saved in the H2Connection instance that must be - # accessed using a with statement in order to acquire a lock. - # pylint: disable=protected-access - with self._connection._conn as connection: - max_concurrent_streams = connection.remote_settings.max_concurrent_streams - - if max_concurrent_streams == self.__previous_server_max_concurrent_streams: - # The server hasn't issued an updated SETTINGS frame. - return - - self.__previous_server_max_concurrent_streams = max_concurrent_streams - # Handle and log unexpected values sent by APNs, just in case. - if max_concurrent_streams > CONCURRENT_STREAMS_SAFETY_MAXIMUM: - logger.warning('APNs max_concurrent_streams too high (%s), resorting to default maximum (%s)', - max_concurrent_streams, CONCURRENT_STREAMS_SAFETY_MAXIMUM) - self.__max_concurrent_streams = CONCURRENT_STREAMS_SAFETY_MAXIMUM - elif max_concurrent_streams < 1: - logger.warning('APNs reported max_concurrent_streams less than 1 (%s), using value of 1', - max_concurrent_streams) - self.__max_concurrent_streams = 1 - else: - logger.info('APNs set max_concurrent_streams to %s', max_concurrent_streams) - self.__max_concurrent_streams = max_concurrent_streams - def connect(self) -> None: """ Establish a connection to APNs. If already connected, the function does nothing. If the connection fails, the function retries up to MAX_CONNECTION_RETRIES times. """ - retries = 0 - while retries < MAX_CONNECTION_RETRIES: - # noinspection PyBroadException - try: - self._connection.connect() - logger.info('Connected to APNs') - return - except Exception: # pylint: disable=broad-except - # close the connnection, otherwise next connect() call would do nothing - self._connection.close() - retries += 1 - logger.exception('Failed connecting to APNs (attempt %s of %s)', retries, MAX_CONNECTION_RETRIES) - - raise ConnectionFailed() + # Not needed for HTTPX + logger.info('APNsClient.connect called') Index: apns2-0.7.2/apns2/credentials.py =================================================================== --- apns2-0.7.2.orig/apns2/credentials.py +++ apns2-0.7.2/apns2/credentials.py @@ -1,30 +1,18 @@ +import ssl import time -from typing import Optional, Tuple, TYPE_CHECKING +from typing import Optional, Tuple import jwt -from hyper import HTTP20Connection # type: ignore -from hyper.tls import init_context # type: ignore - -if TYPE_CHECKING: - from hyper.ssl_compat import SSLContext # type: ignore - DEFAULT_TOKEN_LIFETIME = 2700 DEFAULT_TOKEN_ENCRYPTION_ALGORITHM = 'ES256' # Abstract Base class. This should not be instantiated directly. class Credentials(object): - def __init__(self, ssl_context: 'Optional[SSLContext]' = None) -> None: + def __init__(self, ssl_context: Optional[ssl.SSLContext] = None) -> None: super().__init__() - self.__ssl_context = ssl_context - - # Creates a connection with the credentials, if available or necessary. - def create_connection(self, server: str, port: int, proto: Optional[str], proxy_host: Optional[str] = None, - proxy_port: Optional[int] = None) -> HTTP20Connection: - # self.__ssl_context may be none, and that's fine. - return HTTP20Connection(server, port, ssl_context=self.__ssl_context, force_proto=proto or 'h2', - secure=True, proxy_host=proxy_host, proxy_port=proxy_port) + self.ssl_context = ssl_context def get_authorization_header(self, topic: Optional[str]) -> Optional[str]: return None @@ -32,11 +20,9 @@ class Credentials(object): # Credentials subclass for certificate authentication class CertificateCredentials(Credentials): - def __init__(self, cert_file: Optional[str] = None, password: Optional[str] = None, - cert_chain: Optional[str] = None) -> None: - ssl_context = init_context(cert=cert_file, cert_password=password) - if cert_chain: - ssl_context.load_cert_chain(cert_chain) + def __init__(self, cert_file: Optional[str] = None, password: Optional[str] = None) -> None: + ssl_context = ssl.create_default_context() + ssl_context.load_cert_chain(cert_file, password=password) super(CertificateCredentials, self).__init__(ssl_context) Index: apns2-0.7.2/setup.py =================================================================== --- apns2-0.7.2.orig/setup.py +++ apns2-0.7.2/setup.py @@ -7,7 +7,7 @@ setup( version='0.7.2', packages=['apns2'], install_requires=[ - 'hyper>=0.7', + 'httpx>=0.13', 'PyJWT>=2.0.0', 'cryptography>=1.7.2', ], Index: apns2-0.7.2/test/test_client.py =================================================================== --- apns2-0.7.2.orig/test/test_client.py +++ apns2-0.7.2/test/test_client.py @@ -2,6 +2,8 @@ import contextlib from unittest.mock import MagicMock, Mock, patch import pytest +import httpx +import respx from apns2.client import APNsClient, Credentials, CONCURRENT_STREAMS_SAFETY_MAXIMUM, Notification from apns2.errors import ConnectionFailed @@ -9,7 +11,6 @@ from apns2.payload import Payload TOPIC = 'com.example.App' - @pytest.fixture(scope='session') def tokens(): return ['%064x' % i for i in range(1000)] @@ -23,103 +24,37 @@ def notifications(tokens): @patch('apns2.credentials.init_context') @pytest.fixture -def client(mock_connection): - with patch('apns2.credentials.HTTP20Connection') as mock_connection_constructor: - mock_connection_constructor.return_value = mock_connection - return APNsClient(credentials=Credentials()) - - -@pytest.fixture -def mock_connection(): - mock_connection = MagicMock() - mock_connection.__max_open_streams = 0 - mock_connection.__open_streams = 0 - mock_connection.__mock_results = None - mock_connection.__next_stream_id = 0 - - @contextlib.contextmanager - def mock_get_response(stream_id): - mock_connection.__open_streams -= 1 - if mock_connection.__mock_results: - reason = mock_connection.__mock_results[stream_id] - response = Mock(status=200 if reason == 'Success' else 400) - response.read.return_value = ('{"reason": "%s"}' % reason).encode('utf-8') - yield response - else: - yield Mock(status=200) - - def mock_request(*_args): - mock_connection.__open_streams += 1 - mock_connection.__max_open_streams = max(mock_connection.__open_streams, mock_connection.__max_open_streams) - - stream_id = mock_connection.__next_stream_id - mock_connection.__next_stream_id += 1 - return stream_id - - mock_connection.get_response.side_effect = mock_get_response - mock_connection.request.side_effect = mock_request - mock_connection._conn.__enter__.return_value = mock_connection._conn - mock_connection._conn.remote_settings.max_concurrent_streams = 500 - - return mock_connection - - -def test_connect_establishes_connection(client, mock_connection): - client.connect() - mock_connection.connect.assert_called_once_with() +def client(respx_mock, notifications): + return APNsClient(credentials=Credentials()) -def test_connect_retries_failed_connection(client, mock_connection): - mock_connection.connect.side_effect = [RuntimeError, RuntimeError, None] - client.connect() - assert mock_connection.connect.call_count == 3 - - -def test_connect_stops_on_reaching_max_retries(client, mock_connection): - mock_connection.connect.side_effect = [RuntimeError] * 4 - with pytest.raises(ConnectionFailed): - client.connect() - - assert mock_connection.connect.call_count == 3 - - -def test_send_empty_batch_does_nothing(client, mock_connection): +@respx.mock +def test_send_empty_batch_does_nothing(respx_mock, client): client.send_notification_batch([], TOPIC) - assert mock_connection.request.call_count == 0 + assert respx_mock.calls == [] -def test_send_notification_batch_returns_results_in_order(client, mock_connection, tokens, notifications): +@respx.mock +def test_send_notification_batch_returns_results_in_order(respx_mock, client, tokens, notifications): + [respx_mock.post(f"https://api.push.apple.com/3/device/{token}") % 200 + for token in tokens] results = client.send_notification_batch(notifications, TOPIC) expected_results = {token: 'Success' for token in tokens} assert results == expected_results -def test_send_notification_batch_respects_max_concurrent_streams_from_server(client, mock_connection, tokens, - notifications): - client.send_notification_batch(notifications, TOPIC) - assert mock_connection.__max_open_streams == 500 - - -def test_send_notification_batch_overrides_server_max_concurrent_streams_if_too_large(client, mock_connection, tokens, - notifications): - mock_connection._conn.remote_settings.max_concurrent_streams = 5000 - client.send_notification_batch(notifications, TOPIC) - assert mock_connection.__max_open_streams == CONCURRENT_STREAMS_SAFETY_MAXIMUM - - -def test_send_notification_batch_overrides_server_max_concurrent_streams_if_too_small(client, mock_connection, tokens, - notifications): - mock_connection._conn.remote_settings.max_concurrent_streams = 0 - client.send_notification_batch(notifications, TOPIC) - assert mock_connection.__max_open_streams == 1 - - -def test_send_notification_batch_reports_different_results(client, mock_connection, tokens, - notifications): - mock_connection.__mock_results = ( +@respx.mock +def test_send_notification_batch_reports_different_results(respx_mock, client, tokens, notifications): + call_results = ( ['BadDeviceToken'] * 1000 + ['Success'] * 1000 + ['DeviceTokenNotForTopic'] * 2000 + ['Success'] * 1000 + ['BadDeviceToken'] * 500 + ['PayloadTooLarge'] * 4500 ) + for cr, token in zip(call_results, tokens): + status = 200 + if cr != 'Success': + status = 400 + res = httpx.Response(status, text=cr) + respx_mock.post(f"https://api.push.apple.com/3/device/{token}") % res results = client.send_notification_batch(notifications, TOPIC) - expected_results = dict(zip(tokens, mock_connection.__mock_results)) + expected_results = dict(zip(tokens, call_results)) assert results == expected_results
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor