File size: 2,519 Bytes
47d9b1d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import queue
import asyncio


class IteratorPipe:
    """
    Iterator Pipe creates an iterator that can be fed in data from another block of code or thread of execution
    """

    def __init__(self, sentinel=object()):
        self._q = queue.Queue()
        self._sentinel = sentinel
        self._sentinel_pushed = False
        self._closed = False

    def __iter__(self):
        return self

    def __next__(self):
        if self._closed:
            raise StopIteration

        data = self._q.get(block=True)
        if data is self._sentinel:
            self._closed = True
            raise StopIteration

        return data

    def put(self, data) -> bool:
        """
        Pushes next item to Iterator and returns True
        If iterator has been closed via close(), doesn't push anything and returns False
        """
        if self._sentinel_pushed:
            return False

        self._q.put(data)
        return True

    def close(self):
        """
        Close is idempotent. Calling close multiple times is safe
        Iterator will raise StopIteration only after all elements pushed before close have been iterated
        """
        # make close idempotent
        if not self._sentinel_pushed:
            self._sentinel_pushed = True
        self._q.put(self._sentinel)


class AsyncIteratorPipe:

    def __init__(self, sentinel=object()):
        self._q = asyncio.Queue()
        self._sentinel = sentinel
        self._sentinel_pushed = False
        self._closed = False

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self._closed:
            raise StopAsyncIteration

        data = await self._q.get()
        if data is self._sentinel:
            self._closed = True
            raise StopAsyncIteration

        return data

    async def put(self, data) -> bool:
        """
        Pushes next item to Iterator and returns True
        If iterator has been closed via close(), doesn't push anything and returns False
        """
        if self._sentinel_pushed:
            return False

        await self._q.put(data)
        return True

    async def close(self):
        """
        Close is idempotent. Calling close multiple times is safe
        Iterator will raise StopIteration only after all elements pushed before close have been iterated
        """
        # make close idempotent
        if not self._sentinel_pushed:
            self._sentinel_pushed = True
            await self._q.put(self._sentinel)