Task.async_stream

You're seeing just the function async_stream, go back to Task module for more information.
Link to this function

async_stream(enumerable, fun, options \\ [])

View Source (since 1.4.0)

Specs

async_stream(Enumerable.t(), (term() -> term()), keyword()) :: Enumerable.t()

Returns a stream that runs the given function fun concurrently on each element in enumerable.

Works the same as async_stream/5 but with an anonymous function instead of a module-function-arguments tuple. fun must be a one-arity anonymous function.

Each enumerable element is passed as argument to the given function fun and processed by its own task. The tasks will be linked to the current process, similarly to async/1.

Example

Count the code points in each string asynchronously, then add the counts together using reduce.

iex> strings = ["long string", "longer string", "there are many of these"]
iex> stream = Task.async_stream(strings, fn text -> text |> String.codepoints() |> Enum.count() end)
iex> Enum.reduce(stream, 0, fn {:ok, num}, acc -> num + acc end)
47

See async_stream/5 for discussion, options, and more examples.

Link to this function

async_stream(enumerable, module, function_name, args, options \\ [])

View Source (since 1.4.0)

Specs

async_stream(Enumerable.t(), module(), atom(), [term()], keyword()) ::
  Enumerable.t()

Returns a stream where the given function (module and function_name) is mapped concurrently on each element in enumerable.

Each element of enumerable will be prepended to the given args and processed by its own task. The tasks will be linked to an intermediate process that is then linked to the current process. This means a failure in a task terminates the current process and a failure in the current process terminates all tasks.

When streamed, each task will emit {:ok, value} upon successful completion or {:exit, reason} if the caller is trapping exits. The order of results depends on the value of the :ordered option.

The level of concurrency and the time tasks are allowed to run can be controlled via options (see the "Options" section below).

Consider using Task.Supervisor.async_stream/6 to start tasks under a supervisor. If you find yourself trapping exits to handle exits inside the async stream, consider using Task.Supervisor.async_stream_nolink/6 to start tasks that are not linked to the calling process.

Options

  • :max_concurrency - sets the maximum number of tasks to run at the same time. Defaults to System.schedulers_online/0.

  • :ordered - whether the results should be returned in the same order as the input stream. When the output is ordered, Elixir may need to buffer results to emit them in the original order. Setting this option to false disables the need to buffer at the cost of removing ordering. This is also useful when you're using the tasks only for the side effects. Note that regardless of what :ordered is set to, the tasks will process asynchronously. If you need to process elements in order, consider using Enum.map/2 or Enum.each/2 instead. Defaults to true.

  • :timeout - the maximum amount of time (in milliseconds or :infinity) each task is allowed to execute for. Defaults to 5000.

  • :on_timeout - what to do when a task times out. The possible values are:

    • :exit (default) - the process that spawned the tasks exits.
    • :kill_task - the task that timed out is killed. The value emitted for that task is {:exit, :timeout}.

Example

Let's build a stream and then enumerate it:

stream = Task.async_stream(collection, Mod, :expensive_fun, [])
Enum.to_list(stream)

The concurrency can be increased or decreased using the :max_concurrency option. For example, if the tasks are IO heavy, the value can be increased:

max_concurrency = System.schedulers_online() * 2
stream = Task.async_stream(collection, Mod, :expensive_fun, [], max_concurrency: max_concurrency)
Enum.to_list(stream)

If you do not care about the results of the computation, you can run the stream with Stream.run/1. Also set ordered: false, as you don't care about the order of the results either:

stream = Task.async_stream(collection, Mod, :expensive_fun, [], ordered: false)
Stream.run(stream)

Attention: async + take

Given items in an async stream are processed concurrently, doing async_stream followed by Enum.take/2 may cause more items than requested to be processed. Let's see an example:

1..100
|> Task.async_stream(fn i ->
  Process.sleep(100)
  IO.puts(to_string(i))
end)
|> Enum.take(10)

For a machine with 8 cores, the above will process 16 items instead of 10. The reason is that async_stream/5 always have 8 elements processing at once. So by the time Enum says it got all elements it needed, there are still 6 elements left to be processed.

The solution here is to use Stream.take/2 instead of Enum.take/2 to filter elements before-hand:

1..100
|> Stream.take(10)
|> Task.async_stream(fn i ->
  Process.sleep(100)
  IO.puts(to_string(i))
end)
|> Enum.to_list()

If for some reason you cannot take the elements before hand, you can use :max_concurrency to limit how many elements may be over processed at the cost of reducing concurrency.