Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
openSUSE:Leap:15.2:FactoryCandidates
python-streamz
streamz-pr455-ci-fixes.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File streamz-pr455-ci-fixes.patch of Package python-streamz
From 7cc8ac57cae702c3a7ac3b8aed9043dad367c1a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro?= <simon.hansen@me.com> Date: Sun, 11 Sep 2022 10:51:56 +0200 Subject: [PATCH 01/12] Using _repr_mimebundle if attribute on Output --- streamz/core.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) Index: streamz-0.6.4/streamz/core.py =================================================================== --- streamz-0.6.4.orig/streamz/core.py +++ streamz-0.6.4/streamz/core.py @@ -379,13 +379,14 @@ class Stream(APIRegisterMixin): __repr__ = __str__ def _ipython_display_(self, **kwargs): # pragma: no cover + # Since this function is only called by jupyter, this import must succeed + from IPython.display import HTML, display + try: import ipywidgets from IPython.core.interactiveshell import InteractiveShell output = ipywidgets.Output(_view_count=0) except ImportError: - # since this function is only called by jupyter, this import must succeed - from IPython.display import display, HTML if hasattr(self, '_repr_html_'): return display(HTML(self._repr_html_())) else: @@ -420,7 +421,11 @@ class Stream(APIRegisterMixin): output.observe(remove_stream, '_view_count') - return output._ipython_display_(**kwargs) + if hasattr(output, "_repr_mimebundle_"): + data = output._repr_mimebundle_(**kwargs) + return display(data, raw=True) + else: + return output._ipython_display_(**kwargs) def _emit(self, x, metadata=None): """ @@ -1468,18 +1473,23 @@ class zip(Stream): def __init__(self, *upstreams, **kwargs): self.maxsize = kwargs.pop('maxsize', 10) - self.condition = Condition() + self._condition = None self.literals = [(i, val) for i, val in enumerate(upstreams) if not isinstance(val, Stream)] self.buffers = {upstream: deque() for upstream in upstreams if isinstance(upstream, Stream)} - upstreams2 = [upstream for upstream in upstreams if isinstance(upstream, Stream)] Stream.__init__(self, upstreams=upstreams2, **kwargs) + @property + def condition(self): + if self._condition is None: + self._condition = Condition() + return self._condition + def _add_upstream(self, upstream): # Override method to handle setup of buffer for new stream self.buffers[upstream] = deque() @@ -1876,7 +1886,7 @@ class latest(Stream): _graphviz_shape = 'octagon' def __init__(self, upstream, **kwargs): - self.condition = Condition() + self._condition = None self.next = [] self.next_metadata = None @@ -1885,6 +1895,12 @@ class latest(Stream): self.loop.add_callback(self.cb) + @property + def condition(self): + if self._condition is None: + self._condition = Condition() + return self._condition + def update(self, x, who=None, metadata=None): if self.next_metadata: self._release_refs(self.next_metadata) Index: streamz-0.6.4/streamz/tests/py3_test_core.py =================================================================== --- streamz-0.6.4.orig/streamz/tests/py3_test_core.py +++ streamz-0.6.4/streamz/tests/py3_test_core.py @@ -1,16 +1,16 @@ # flake8: noqa +import asyncio from time import time -from distributed.utils_test import loop, inc # noqa -from tornado import gen +from distributed.utils_test import inc # noqa from streamz import Stream -def test_await_syntax(loop): # noqa +def test_await_syntax(): # noqa L = [] async def write(x): - await gen.sleep(0.1) + await asyncio.sleep(0.1) L.append(x) async def f(): @@ -25,4 +25,4 @@ def test_await_syntax(loop): # noqa assert 0.2 < stop - start < 0.4 assert 2 <= len(L) <= 4 - loop.run_sync(f) + asyncio.run(f()) Index: streamz-0.6.4/streamz/tests/test_core.py =================================================================== --- streamz-0.6.4.orig/streamz/tests/test_core.py +++ streamz-0.6.4/streamz/tests/test_core.py @@ -1,3 +1,4 @@ +import asyncio from datetime import timedelta from functools import partial import itertools @@ -12,6 +13,7 @@ import pytest from tornado.queues import Queue from tornado.ioloop import IOLoop +from tornado import gen import streamz as sz @@ -19,7 +21,7 @@ from streamz import RefCounter from streamz.sources import sink_to_file from streamz.utils_test import (inc, double, gen_test, tmpfile, captured_logger, # noqa: F401 clean, await_for, metadata, wait_for) # noqa: F401 -from distributed.utils_test import loop # noqa: F401 +from distributed.utils_test import loop, loop_in_thread, cleanup # noqa: F401 def test_basic(): @@ -1485,20 +1487,6 @@ def dont_test_stream_kwargs(clean): # n sin.emit(1) -@pytest.fixture -def thread(loop): # noqa: F811 - from threading import Thread, Event - thread = Thread(target=loop.start) - thread.daemon = True - thread.start() - - event = Event() - loop.add_callback(event.set) - event.wait() - - return thread - - def test_percolate_loop_information(clean): # noqa: F811 source = Stream() assert not source.loop @@ -1506,16 +1494,6 @@ def test_percolate_loop_information(clea assert source.loop is s.loop -def test_separate_thread_without_time(loop, thread): # noqa: F811 - assert thread.is_alive() - source = Stream(loop=loop) - L = source.map(inc).sink_to_list() - - for i in range(10): - source.emit(i) - assert L[-1] == i + 1 - - def test_separate_thread_with_time(clean): # noqa: F811 L = [] Index: streamz-0.6.4/streamz/tests/test_dask.py =================================================================== --- streamz-0.6.4.orig/streamz/tests/test_dask.py +++ streamz-0.6.4/streamz/tests/test_dask.py @@ -72,10 +72,10 @@ async def test_partition_then_scatter_as assert L == [1, 2, 3] -def test_partition_then_scatter_sync(loop): +def test_partition_then_scatter_sync(): # Ensure partition w/ timeout before scatter works correctly for synchronous with cluster() as (s, [a, b]): - with Client(s['address'], loop=loop) as client: # noqa: F841 + with Client(s['address']) as client: # noqa: F841 start = time.monotonic() source = Stream() L = source.partition(2, timeout=.1).scatter().map( @@ -164,9 +164,9 @@ async def test_accumulate(c, s, a, b): assert L[-1][1] == 3 -def test_sync(loop): # noqa: F811 +def test_sync(): # noqa: F811 with cluster() as (s, [a, b]): - with Client(s['address'], loop=loop) as client: # noqa: F841 + with Client(s['address']) as client: # noqa: F841 source = Stream() L = source.scatter().map(inc).gather().sink_to_list() @@ -174,14 +174,14 @@ def test_sync(loop): # noqa: F811 for i in range(10): await source.emit(i, asynchronous=True) - sync(loop, f) + sync(client.loop, f) assert L == list(map(inc, range(10))) -def test_sync_2(loop): # noqa: F811 +def test_sync_2(): # noqa: F811 with cluster() as (s, [a, b]): - with Client(s['address'], loop=loop): # noqa: F841 + with Client(s['address']): # noqa: F841 source = Stream() L = source.scatter().map(inc).gather().sink_to_list() @@ -218,9 +218,9 @@ async def test_buffer(c, s, a, b): assert source.loop == c.loop -def test_buffer_sync(loop): # noqa: F811 +def test_buffer_sync(): # noqa: F811 with cluster() as (s, [a, b]): - with Client(s['address'], loop=loop) as c: # noqa: F841 + with Client(s['address']) as c: # noqa: F841 source = Stream() buff = source.scatter().map(slowinc, delay=0.5).buffer(5) L = buff.gather().sink_to_list() @@ -241,10 +241,11 @@ def test_buffer_sync(loop): # noqa: F81 assert L == list(map(inc, range(10))) +@pytest.mark.asyncio @pytest.mark.xfail(reason='') -async def test_stream_shares_client_loop(loop): # noqa: F811 +async def test_stream_shares_client_loop(): # noqa: F811 with cluster() as (s, [a, b]): - with Client(s['address'], loop=loop) as client: # noqa: F841 + with Client(s['address']) as client: # noqa: F841 source = Stream() d = source.timed_window('20ms').scatter() # noqa: F841 assert source.loop is client.loop Index: streamz-0.6.4/ci/environment-py310.yml =================================================================== --- /dev/null +++ streamz-0.6.4/ci/environment-py310.yml @@ -0,0 +1,27 @@ +name: test_env +channels: + - conda-forge + - defaults +dependencies: + - python=3.10 + - pytest + - flake8 + - black + - isort + - tornado + - toolz + - librdkafka + - dask + - distributed + - pandas + - python-confluent-kafka + - codecov + - coverage + - networkx + - graphviz + - pytest-asyncio + - python-graphviz + - bokeh + - ipywidgets + - flaky + - pytest-cov Index: streamz-0.6.4/ci/environment-py39.yml =================================================================== --- /dev/null +++ streamz-0.6.4/ci/environment-py39.yml @@ -0,0 +1,34 @@ +name: test_env +channels: + - conda-forge + - defaults +dependencies: + - python=3.9 + - pytest + - flake8 + - black + - isort + - tornado + - toolz + - zict + - six + - librdkafka=1.5.3 + - dask + - distributed + - pandas + - python-confluent-kafka=1.5.0 + - numpydoc + - sphinx + - sphinx_rtd_theme + - codecov + - coverage + - networkx + - graphviz + - pytest-asyncio + - python-graphviz + - bokeh + - ipython + - ipykernel + - ipywidgets + - flaky + - pytest-cov Index: streamz-0.6.4/streamz/dataframe/tests/test_dataframes.py =================================================================== --- streamz-0.6.4.orig/streamz/dataframe/tests/test_dataframes.py +++ streamz-0.6.4/streamz/dataframe/tests/test_dataframes.py @@ -219,7 +219,7 @@ def test_binary_stream_operators(stream) a.emit(df) - assert_eq(b[0], expected) + wait_for(lambda: b and b[0].equals(expected), 1) def test_index(stream): @@ -246,7 +246,7 @@ def test_pair_arithmetic(stream): a.emit(df.iloc[:5]) a.emit(df.iloc[5:]) - assert len(L) == 2 + wait_for(lambda: len(L) == 2, 1) assert_eq(pd.concat(L, axis=0), (df.x + df.y) * 2) @@ -259,7 +259,7 @@ def test_getitem(stream): a.emit(df.iloc[:5]) a.emit(df.iloc[5:]) - assert len(L) == 2 + wait_for(lambda: len(L) == 2, 1) assert_eq(pd.concat(L, axis=0), df[df.x > 4]) @@ -298,6 +298,7 @@ def test_groupby_aggregate(agg, grouper, a.emit(df.iloc[7:]) first = df.iloc[:3] + wait_for(lambda: len(L) > 2, 1) assert assert_eq(L[0], f(first)) assert assert_eq(L[-1], f(df)) @@ -382,7 +383,7 @@ def test_setitem(stream): df['a'] = 10 df[['c', 'd']] = df[['x', 'y']] - assert_eq(L[-1], df.mean()) + wait_for(lambda: L and L[-1].equals(df.mean()), 1) def test_setitem_overwrites(stream): Index: streamz-0.6.4/setup.py =================================================================== --- streamz-0.6.4.orig/setup.py +++ streamz-0.6.4/setup.py @@ -17,7 +17,7 @@ setup(name='streamz', license='BSD', keywords='streams', packages=packages + tests, - python_requires='>=3.7', + python_requires='>=3.8', long_description=(open('README.rst').read() if exists('README.rst') else ''), install_requires=list(open('requirements.txt').read().strip().split('\n')), Index: streamz-0.6.4/streamz/tests/test_kafka.py =================================================================== --- streamz-0.6.4.orig/streamz/tests/test_kafka.py +++ streamz-0.6.4/streamz/tests/test_kafka.py @@ -55,8 +55,8 @@ def launch_kafka(): cmd = ("docker run -d -p 2181:2181 -p 9092:9092 --env " "ADVERTISED_HOST=127.0.0.1 --env ADVERTISED_PORT=9092 " "--name streamz-kafka spotify/kafka") - print(cmd) - cid = subprocess.check_output(shlex.split(cmd)).decode()[:-1] + cid = subprocess.check_output(shlex.split(cmd), + stderr=subprocess.DEVNULL).decode()[:-1] def end(): if cid: @@ -66,11 +66,11 @@ def launch_kafka(): def predicate(): try: out = subprocess.check_output(['docker', 'logs', cid], - stderr=subprocess.STDOUT) - return b'kafka entered RUNNING state' in out + stderr=subprocess.STDOUT) + return b'RUNNING' in out except subprocess.CalledProcessError: pass - wait_for(predicate, 10, period=0.1) + wait_for(predicate, 45, period=0.1) return cid @@ -169,7 +169,7 @@ def test_from_kafka_thread(): stream = Stream.from_kafka([TOPIC], ARGS) out = stream.sink_to_list() stream.start() - yield gen.sleep(1.1) + yield await_for(lambda: stream.started, 10, period=0.1) for i in range(10): yield gen.sleep(0.1) kafka.produce(TOPIC, b'value-%d' % i) @@ -182,14 +182,6 @@ def test_from_kafka_thread(): kafka.flush() yield await_for(lambda: out[-1] == b'final message', 10, period=0.1) - stream._close_consumer() - kafka.produce(TOPIC, b'lost message') - kafka.flush() - # absolute sleep here, since we expect output list *not* to change - yield gen.sleep(1) - assert out[-1] == b'final message' - stream._close_consumer() - def test_kafka_batch(): j = random.randint(0, 10000) @@ -585,6 +577,8 @@ def test_kafka_checkpointing_auto_offset stream1 = Stream.from_kafka_batched(TOPIC, ARGS, asynchronous=True) out1 = stream1.map(split).gather().sink_to_list() + time.sleep(1) # messages make ttheir way through kafka + stream1.start() wait_for(lambda: stream1.upstream.started, 10, period=0.1)
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor