Stream
Module for creating and composing streams.
Streams are composable, lazy enumerables. Any enumerable that generates
items one by one during enumeration is called a stream. For example,
Elixir's Range is a stream:
iex> range = 1..5
1..5
iex> Enum.map range, &(&1 * 2)
[2,4,6,8,10]
In the example above, as we mapped over the range, the elements being
enumerated were created one by one, during enumeration. The Stream
module allows us to map the range, without triggering its enumeration:
iex> range = 1..3
iex> stream = Stream.map(range, &(&1 * 2))
iex> Enum.map(stream, &(&1 + 1))
[3,5,7]
Notice we started with a range and then we created a stream that is
meant to multiply each item in the range by 2. At this point, no
computation was done yet. Just when Enum.map/2 is called we
enumerate over each item in the range, multiplying it by 2 and adding 1.
We say the functions in Stream are lazy and the functions in Enum
are eager.
Due to their laziness, streams are useful when working with large
(or even infinite) collections. When chaining many operations with Enum,
intermediate lists are created, while Stream creates a recipe of
computations that are executed at a later moment. Let's see another
example:
1..3 |>
Enum.map(&IO.inspect(&1)) |>
Enum.map(&(&1 * 2)) |>
Enum.map(&IO.inspect(&1))
1
2
3
2
4
6
#=> [2,4,6]
Notice that we first printed each item in the list, then multiplied each element by 2 and finally printed each new value. In this example, the list was enumerated three times. Let's see an example with streams:
stream = 1..3 |>
Stream.map(&IO.inspect(&1)) |>
Stream.map(&(&1 * 2)) |>
Stream.map(&IO.inspect(&1))
Enum.to_list(stream)
1
2
2
4
3
6
#=> [2,4,6]
Although the end result is the same, the order in which the items were printed changed! With streams, we print the first item and then print its double. In this example, the list was enumerated just once!
That's what we meant when we first said that streams are composable,
lazy enumerables. Notice we could call Stream.map/2 multiple times,
effectively composing the streams and they are lazy. The computations
are performed only when you call a function from the Enum module.
Creating Streams
There are many functions in Elixir's standard library that return streams, some examples are:
IO.stream/2- Streams input lines, one by one;URI.query_decoder/1- Decodes a query string, pair by pair;
This module also provides many convenience functions for creating streams,
like Stream.cycle/1, Stream.unfold/2, Stream.resource/3 and more.
Summary
| after(lazy, fun) | Executes the given function when the stream is done, halted or an error happened during streaming. Useful for resource clean up |
| chunk(enum, n) | Shortcut to |
| chunk(enum, n, step, pad \\ nil) | Streams the enumerable in chunks, containing |
| chunk_by(enum, fun) | Chunks the |
| concat(enumerables) | Creates a stream that enumerates each enumerable in an enumerable |
| concat(first, second) | Creates a stream that enumerates the first argument, followed by the second |
| cycle(enumerable) | Creates a stream that cycles through the given enumerable, infinitely |
| drop(enum, n) | Lazily drops the next |
| drop_while(enum, fun) | Lazily drops elements of the enumerable while the given
function returns |
| each(enum, fun) | Execute the given function for each item |
| filter(enum, fun) | Creates a stream that filters elements according to the given function on enumeration |
| filter_map(enum, filter, mapper) | Creates a stream that filters and then maps elements according to given functions |
| flat_map(enum, mapper) | Creates a stream that will apply the given function on enumeration and flatten the result |
| iterate(start_value, next_fun) | Emit a sequence of values, starting with |
| map(enum, fun) | Creates a stream that will apply the given function on enumeration |
| reject(enum, fun) | Creates a stream that will reject elements according to the given function on enumeration |
| repeatedly(generator_fun) | Returns a stream generated by calling |
| resource(start_fun, next_fun, after_fun) | Emits a sequence of values for the given resource |
| run(stream) | Runs the given stream |
| scan(enum, fun) | Creates a stream that applies the given function to each element, emits the result and uses the same result as the accumulator for the next computation |
| scan(enum, acc, fun) | Creates a stream that applies the given function to each
element, emits the result and uses the same result as the accumulator
for the next computation. Uses the given |
| take(enum, n) | Lazily takes the next |
| take_every(enum, n) | Creates a stream that takes every |
| take_while(enum, fun) | Lazily takes elements of the enumerable while the given
function returns |
| transform(enum, acc, reducer) | Transforms an existing stream |
| unfold(next_acc, next_fun) | Emits a sequence of values for the given accumulator |
| uniq(enum, fun \\ fn x -> x end) | Creates a stream that only emits elements if they are unique |
| with_index(enum) | Creates a stream where each item in the enumerable will be wrapped in a tuple alongside its index |
| zip(left, right) | Zips two collections together, lazily |
Functions
Specs:
- after(Enumerable.t, (() -> term)) :: Enumerable.t
Executes the given function when the stream is done, halted or an error happened during streaming. Useful for resource clean up.
Callbacks registered later will be executed earlier.
Examples
iex> stream = Stream.after [1,2,3], fn -> Process.put(:done, true) end
iex> Enum.to_list(stream)
[1,2,3]
iex> Process.get(:done)
true
Specs:
- chunk(Enumerable.t, non_neg_integer, non_neg_integer, Enumerable.t | nil) :: Enumerable.t
Streams the enumerable in chunks, containing n items each, where
each new chunk starts step elements into the enumerable.
step is optional and, if not passed, defaults to n, i.e.
chunks do not overlap. If the final chunk does not have n
elements to fill the chunk, elements are taken as necessary
from pad if it was passed. If pad is passed and does not
have enough elements to fill the chunk, then the chunk is
returned anyway with less than n elements. If pad is not
passed at all or is nil, then the partial chunk is discarded
from the result.
Examples
iex> Stream.chunk([1, 2, 3, 4, 5, 6], 2) |> Enum.to_list
[[1, 2], [3, 4], [5, 6]]
iex> Stream.chunk([1, 2, 3, 4, 5, 6], 3, 2) |> Enum.to_list
[[1, 2, 3], [3, 4, 5]]
iex> Stream.chunk([1, 2, 3, 4, 5, 6], 3, 2, [7]) |> Enum.to_list
[[1, 2, 3], [3, 4, 5], [5, 6, 7]]
iex> Stream.chunk([1, 2, 3, 4, 5, 6], 3, 3, []) |> Enum.to_list
[[1, 2, 3], [4, 5, 6]]
Specs:
- chunk_by(Enumerable.t, (element -> any)) :: Enumerable.t
Chunks the enum by buffering elements for which fun returns
the same value and only emit them when fun returns a new value
or the enum finishes.
Examples
iex> stream = Stream.chunk_by([1, 2, 2, 3, 4, 4, 6, 7, 7], &(rem(&1, 2) == 1))
iex> Enum.to_list(stream)
[[1], [2, 2], [3], [4, 4, 6], [7, 7]]
Specs:
- concat(Enumerable.t) :: Enumerable.t
Creates a stream that enumerates each enumerable in an enumerable.
Examples
iex> stream = Stream.concat([1..3, 4..6, 7..9])
iex> Enum.to_list(stream)
[1,2,3,4,5,6,7,8,9]
Specs:
- concat(Enumerable.t, Enumerable.t) :: Enumerable.t
Creates a stream that enumerates the first argument, followed by the second.
Examples
iex> stream = Stream.concat(1..3, 4..6)
iex> Enum.to_list(stream)
[1,2,3,4,5,6]
iex> stream1 = Stream.cycle([1, 2, 3])
iex> stream2 = Stream.cycle([4, 5, 6])
iex> stream = Stream.concat(stream1, stream2)
iex> Enum.take(stream, 6)
[1,2,3,1,2,3]
Specs:
- cycle(Enumerable.t) :: Enumerable.t
Creates a stream that cycles through the given enumerable, infinitely.
Examples
iex> stream = Stream.cycle([1,2,3])
iex> Enum.take(stream, 5)
[1,2,3,1,2]
Specs:
- drop(Enumerable.t, non_neg_integer) :: Enumerable.t
Lazily drops the next n items from the enumerable.
If a negative n is given, it will drop the last n items from
the collection. Note that the mechanism by which this is implemented
will delay the emission of any item until n additional items have
been emitted by the enum.
Examples
iex> stream = Stream.drop(1..10, 5)
iex> Enum.to_list(stream)
[6,7,8,9,10]
iex> stream = Stream.drop(1..10, -5)
iex> Enum.to_list(stream)
[1,2,3,4,5]
Specs:
- drop_while(Enumerable.t, (element -> as_boolean(term))) :: Enumerable.t
Lazily drops elements of the enumerable while the given
function returns true.
Examples
iex> stream = Stream.drop_while(1..10, &(&1 <= 5))
iex> Enum.to_list(stream)
[6,7,8,9,10]
Specs:
- each(Enumerable.t, (element -> term)) :: Enumerable.t
Execute the given function for each item.
Useful for adding side effects (like printing) to a stream.
Examples
iex> stream = Stream.each([1, 2, 3], fn(x) -> send self, x end)
iex> Enum.to_list(stream)
iex> receive do: (x when is_integer(x) -> x)
1
iex> receive do: (x when is_integer(x) -> x)
2
iex> receive do: (x when is_integer(x) -> x)
3
Specs:
- filter(Enumerable.t, (element -> as_boolean(term))) :: Enumerable.t
Creates a stream that filters elements according to the given function on enumeration.
Examples
iex> stream = Stream.filter([1, 2, 3], fn(x) -> rem(x, 2) == 0 end)
iex> Enum.to_list(stream)
[2]
Specs:
- filter_map(Enumerable.t, (element -> as_boolean(term)), (element -> any)) :: Enumerable.t
Creates a stream that filters and then maps elements according to given functions.
Exists for symmetry with Enum.filter_map/3.
Examples
iex> stream = Stream.filter_map(1..6, fn(x) -> rem(x, 2) == 0 end, &(&1 * 2))
iex> Enum.to_list(stream)
[4,8,12]
Specs:
- flat_map(Enumerable.t, (element -> Enumerable.t)) :: Enumerable.t
Creates a stream that will apply the given function on enumeration and flatten the result.
Examples
iex> stream = Stream.flat_map([1, 2, 3], fn(x) -> [x, x * 2] end)
iex> Enum.to_list(stream)
[1, 2, 2, 4, 3, 6]
Specs:
- iterate(element, (element -> element)) :: Enumerable.t
Emit a sequence of values, starting with start_value. Successive
values are generated by calling next_fun on the previous value.
Examples
iex> Stream.iterate(0, &(&1+1)) |> Enum.take(5)
[0,1,2,3,4]
Specs:
- map(Enumerable.t, (element -> any)) :: Enumerable.t
Creates a stream that will apply the given function on enumeration.
Examples
iex> stream = Stream.map([1, 2, 3], fn(x) -> x * 2 end)
iex> Enum.to_list(stream)
[2,4,6]
Specs:
- reject(Enumerable.t, (element -> as_boolean(term))) :: Enumerable.t
Creates a stream that will reject elements according to the given function on enumeration.
Examples
iex> stream = Stream.reject([1, 2, 3], fn(x) -> rem(x, 2) == 0 end)
iex> Enum.to_list(stream)
[1,3]
Specs:
- repeatedly((() -> element)) :: Enumerable.t
Returns a stream generated by calling generator_fun repeatedly.
Examples
iex> Stream.repeatedly(&:random.uniform/0) |> Enum.take(3)
[0.4435846174457203, 0.7230402056221108, 0.94581636451987]
Specs:
Emits a sequence of values for the given resource.
Similar to unfold/2 but the initial value is computed lazily via
start_fun and executes an after_fun at the end of enumeration
(both in cases of success and failure).
Successive values are generated by calling next_fun with the
previous accumulator (the initial value being the result returned
by start_fun) and it must return a tuple with the current and
next accumulator. The enumeration finishes if it returns nil.
As the name says, this function is useful to stream values from resources.
Examples
Stream.resource(fn -> File.open!("sample") end,
fn file ->
case IO.read(file, :line) do
data when is_binary(data) -> { data, file }
_ -> nil
end
end,
fn file -> File.close(file) end)
Specs:
- run(Enumerable.t) :: :ok
Runs the given stream.
This is useful when a stream needs to be run, for side effects, and there is no interest in its return result.
Examples
Open up a file, replace all # by % and stream to another file
without loading the whole file in memory:
stream = File.stream!("code")
|> Stream.map(&String.replace(&1, "#", "%"))
|> File.stream_to!("new")
No computation will be done until we call one of the Enum functions
or Stream.run/1.
Specs:
- scan(Enumerable.t, (element, acc -> any)) :: Enumerable.t
Creates a stream that applies the given function to each element, emits the result and uses the same result as the accumulator for the next computation.
Examples
iex> stream = Stream.scan(1..5, &(&1 + &2))
iex> Enum.to_list(stream)
[1,3,6,10,15]
Specs:
- scan(Enumerable.t, acc, (element, acc -> any)) :: Enumerable.t
Creates a stream that applies the given function to each
element, emits the result and uses the same result as the accumulator
for the next computation. Uses the given acc as the starting value.
Examples
iex> stream = Stream.scan(1..5, 0, &(&1 + &2))
iex> Enum.to_list(stream)
[1,3,6,10,15]
Specs:
- take(Enumerable.t, non_neg_integer) :: Enumerable.t
Lazily takes the next n items from the enumerable and stops
enumeration.
If a negative n is given, the last n values will be taken.
For such, the collection is fully enumerated keeping up to 2 * n
elements in memory. Once the end of the collection is reached,
the last count elements will be executed. Therefore, using
a negative n on an infinite collection will never return.
Examples
iex> stream = Stream.take(1..100, 5)
iex> Enum.to_list(stream)
[1,2,3,4,5]
iex> stream = Stream.take(1..100, -5)
iex> Enum.to_list(stream)
[96,97,98,99,100]
iex> stream = Stream.cycle([1, 2, 3]) |> Stream.take(5)
iex> Enum.to_list(stream)
[1,2,3,1,2]
Specs:
- take_every(Enumerable.t, non_neg_integer) :: Enumerable.t
Creates a stream that takes every n item from the enumerable.
The first item is always included, unless n is 0.
Examples
iex> stream = Stream.take_every(1..10, 2)
iex> Enum.to_list(stream)
[1,3,5,7,9]
Specs:
- take_while(Enumerable.t, (element -> as_boolean(term))) :: Enumerable.t
Lazily takes elements of the enumerable while the given
function returns true.
Examples
iex> stream = Stream.take_while(1..100, &(&1 <= 5))
iex> Enum.to_list(stream)
[1,2,3,4,5]
Specs:
- transform(Enumerable.t, acc, fun) :: Enumerable.t when fun: (element, acc -> {Enumerable.t, acc} | {:halt, acc}), acc: any
Transforms an existing stream.
It expects an accumulator and a function that receives each stream item
and an accumulator, and must return a tuple containing a new stream
(often a list) with the new accumulator or a tuple with :halt as first
element and the accumulator as second.
Note: this function is similar to Enum.flat_map_reduce/3 except the
latter returns both the flat list and accumulator, while this one returns
only the stream.
Examples
Stream.transform/3 is a useful as it can be used as basis to implement
many of the functions defined in this module. For example, we can implement
Stream.take(enum, n) as follows:
iex> enum = 1..100
iex> n = 3
iex> stream = Stream.transform(enum, 0, fn i, acc ->
...> if acc < n, do: { [i], acc + 1 }, else: { :halt, acc }
...> end)
iex> Enum.to_list(stream)
[1,2,3]
Specs:
- unfold(acc, (acc -> {element, acc} | nil)) :: Enumerable.t
Emits a sequence of values for the given accumulator.
Successive values are generated by calling next_fun with the previous
accumulator and it must return a tuple with the current and next
accumulator. The enumeration finishes if it returns nil.
Examples
iex> Stream.unfold(5, fn 0 -> nil; n -> {n, n-1} end) |> Enum.to_list()
[5, 4, 3, 2, 1]
Specs:
- uniq(Enumerable.t, (element -> term)) :: Enumerable.t
Creates a stream that only emits elements if they are unique.
Keep in mind that, in order to know if an element is unique or not, this function needs to store all unique values emitted by the stream. Therefore, if the stream is infinite, the number of items stored will grow infinitely, never being garbage collected.
Examples
iex> Stream.uniq([1, 2, 3, 2, 1]) |> Enum.to_list
[1, 2, 3]
iex> Stream.uniq([{1, :x}, {2, :y}, {1, :z}], fn {x, _} -> x end) |> Enum.to_list
[{1,:x}, {2,:y}]
Specs:
- with_index(Enumerable.t) :: Enumerable.t
Creates a stream where each item in the enumerable will be wrapped in a tuple alongside its index.
Examples
iex> stream = Stream.with_index([1, 2, 3])
iex> Enum.to_list(stream)
[{1,0},{2,1},{3,2}]
Specs:
- zip(Enumerable.t, Enumerable.t) :: Enumerable.t
Zips two collections together, lazily.
The zipping finishes as soon as any enumerable completes.
Examples
iex> concat = Stream.concat(1..3, 4..6)
iex> cycle = Stream.cycle([:a, :b, :c])
iex> Stream.zip(concat, cycle) |> Enum.to_list
[{1,:a},{2,:b},{3,:c},{4,:a},{5,:b},{6,:c}]