Source code for pockets.iterators

# -*- coding: utf-8 -*-
# Copyright (c) 2018 the Pockets team, see AUTHORS.
# Licensed under the BSD License, see LICENSE for details.

"""A pocket full of useful iterators!"""

from __future__ import absolute_import, print_function

import collections

import six


__all__ = ["itermod", "iterpeek", "modify_iter", "peek_iter"]


[docs]class iterpeek(object): """ An iterator object that supports peeking ahead. >>> p = iterpeek(["a", "b", "c", "d", "e"]) >>> p.peek() 'a' >>> p.next() 'a' >>> p.peek(3) ['b', 'c', 'd'] Args: o (iterable or callable): `o` is interpreted very differently depending on the presence of `sentinel`. If `sentinel` is not given, then `o` must be a collection object which supports either the iteration protocol or the sequence protocol. If `sentinel` is given, then `o` must be a callable object. sentinel (any value, optional): If given, the iterator will call `o` with no arguments for each call to its `next` method; if the value returned is equal to `sentinel`, :exc:`StopIteration` will be raised, otherwise the value will be returned. See Also: `iterpeek` can operate as a drop in replacement for the built-in `iter <https://docs.python.org/3/library/functions.html#iter>`_ function. Attributes: sentinel (any value): The value used to indicate the iterator is exhausted. If `sentinel` was not given when the `iterpeek` was instantiated, then it will be set to a new object instance: ``object()``. """ def __init__(self, *args): """__init__(o, sentinel=None)""" self._iterable = iter(*args) self._cache = collections.deque() self.sentinel = args[1] if len(args) > 1 else object() def __iter__(self): return self def __next__(self, n=None): # NOTE: Prevent 2to3 from transforming self.next() in next(self), # which causes an infinite loop! return getattr(self, "next")(n) def _fillcache(self, n): """Cache `n` items. If `n` is 0 or None, then 1 item is cached.""" if not n: n = 1 try: while len(self._cache) < n: self._cache.append(next(self._iterable)) except StopIteration: while len(self._cache) < n: self._cache.append(self.sentinel)
[docs] def has_next(self): """ Determine if iterator is exhausted. Returns: bool: True if iterator has more items, False otherwise. Note: Will never raise :exc:`StopIteration`. """ return self.peek() != self.sentinel
[docs] def next(self, n=None): """ Get the next item or `n` items of the iterator. Args: n (int, optional): The number of items to retrieve. Defaults to None. Returns: item or list of items: The next item or `n` items of the iterator. If `n` is None, the item itself is returned. If `n` is an int, the items will be returned in a list. If `n` is 0, an empty list is returned: >>> p = iterpeek(["a", "b", "c", "d", "e"]) >>> p.next() 'a' >>> p.next(0) [] >>> p.next(1) ['b'] >>> p.next(2) ['c', 'd'] Raises: StopIteration: Raised if the iterator is exhausted, even if `n` is 0. """ self._fillcache(n) if not n: if self._cache[0] == self.sentinel: raise StopIteration if n is None: result = self._cache.popleft() else: result = [] else: if self._cache[n - 1] == self.sentinel: raise StopIteration result = [self._cache.popleft() for i in range(n)] return result
[docs] def peek(self, n=None): """ Preview the next item or `n` items of the iterator. The iterator is not advanced when peek is called. Args: n (int, optional): The number of items to retrieve. Defaults to None. Returns: item or list of items: The next item or `n` items of the iterator. If `n` is None, the item itself is returned. If `n` is an int, the items will be returned in a list. If `n` is 0, an empty list is returned. If the iterator is exhausted, `iterpeek.sentinel` is returned, or placed as the last item in the returned list: >>> p = iterpeek(["a", "b", "c"]) >>> p.sentinel = "END" >>> p.peek() 'a' >>> p.peek(0) [] >>> p.peek(1) ['a'] >>> p.peek(2) ['a', 'b'] >>> p.peek(4) ['a', 'b', 'c', 'END'] Note: Will never raise :exc:`StopIteration`. """ self._fillcache(n) if n is None: result = self._cache[0] else: result = [self._cache[i] for i in range(n)] return result
# Backwards compatibility peek_iter = iterpeek
[docs]class itermod(iterpeek): """ An iterator object that supports modifying items as they are returned. >>> a = [" A list ", ... " of strings ", ... " with ", ... " extra ", ... " whitespace. "] >>> modifier = lambda s: s.strip().replace('with', 'without') >>> for s in itermod(a, modifier=modifier): ... print('"%s"' % s) "A list" "of strings" "without" "extra" "whitespace." Args: o (iterable or callable): `o` is interpreted very differently depending on the presence of `sentinel`. If `sentinel` is not given, then `o` must be a collection object which supports either the iteration protocol or the sequence protocol. If `sentinel` is given, then `o` must be a callable object. sentinel (any value, optional): If given, the iterator will call `o` with no arguments for each call to its `next` method; if the value returned is equal to `sentinel`, :exc:`StopIteration` will be raised, otherwise the value will be returned. modifier (callable, optional): The function that will be used to modify each item returned by the iterator. `modifier` should take a single argument and return a single value. Defaults to ``lambda x: x``. If `sentinel` is not given, `modifier` must be passed as a keyword argument. Attributes: modifier (callable): `modifier` is called with each item in `o` as it is iterated. The return value of `modifier` is returned in lieu of the item. Values returned by `peek` as well as `next` are affected by `modifier`. However, `itermod.sentinel` is never passed through `modifier`; it will always be returned from `peek` unmodified. """ def __init__(self, *args, **kwargs): """__init__(o, sentinel=None, modifier=lambda x: x)""" if "modifier" in kwargs: self.modifier = kwargs["modifier"] elif len(args) > 2: self.modifier = args[2] args = args[:2] else: self.modifier = lambda x: x if not six.callable(self.modifier): raise TypeError("itermod(o, modifier): modifier must be callable") super(itermod, self).__init__(*args) def _fillcache(self, n): """ Cache `n` modified items. If `n` is 0 or None, 1 item is cached. Each item returned by the iterator is passed through the `itermod.modified` function before being cached. """ if not n: n = 1 try: while len(self._cache) < n: self._cache.append(self.modifier(next(self._iterable))) except StopIteration: while len(self._cache) < n: self._cache.append(self.sentinel)
# Backwards compatibility modify_iter = itermod