Make evaluation linear (#73)

* Evaluate stale cells that the run cell depends on

* Make evaluation linear
This commit is contained in:
Jonatan Kłosko 2021-03-09 19:35:39 +01:00 committed by GitHub
parent e65a5f712c
commit ac9b5526fe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 246 additions and 102 deletions

View file

@ -182,38 +182,39 @@ defmodule Livebook.Notebook do
end end
@doc """ @doc """
Returns a list of Elixir cells that the given cell depends on. Returns a list of `{cell, section}` pairs including all Elixir cells in order.
The cells are ordered starting from the most direct parent.
""" """
@spec parent_cells(t(), Cell.id()) :: list(Cell.t()) @spec elixir_cells_with_section(t()) :: list({Cell.t(), Section.t()})
def parent_cells(notebook, cell_id) do def elixir_cells_with_section(notebook) do
with {:ok, _, section} <- fetch_cell_and_section(notebook, cell_id) do for section <- notebook.sections,
# A cell depends on all previous cells within the same section. cell <- section.cells,
section.cells cell.type == :elixir,
|> Enum.take_while(&(&1.id != cell_id)) do: {cell, section}
|> Enum.reverse()
|> Enum.filter(&(&1.type == :elixir))
else
_ -> []
end
end end
@doc """ @doc """
Returns a list of Elixir cells that depend on the given cell. Returns a list of Elixir cells (each with section) that the given cell depends on.
The cells are ordered starting from the most direct parent.
"""
@spec parent_cells_with_section(t(), Cell.id()) :: list({Cell.t(), Section.t()})
def parent_cells_with_section(notebook, cell_id) do
notebook
|> elixir_cells_with_section()
|> Enum.take_while(fn {cell, _} -> cell.id != cell_id end)
|> Enum.reverse()
end
@doc """
Returns a list of Elixir cells (each with section) that depend on the given cell.
The cells are ordered starting from the most direct child. The cells are ordered starting from the most direct child.
""" """
@spec child_cells(t(), Cell.id()) :: list(Cell.t()) @spec child_cells_with_section(t(), Cell.id()) :: list({Cell.t(), Section.t()})
def child_cells(notebook, cell_id) do def child_cells_with_section(notebook, cell_id) do
with {:ok, _, section} <- fetch_cell_and_section(notebook, cell_id) do notebook
# A cell affects all the cells below it within the same section. |> elixir_cells_with_section()
section.cells |> Enum.drop_while(fn {cell, _} -> cell.id != cell_id end)
|> Enum.drop_while(&(&1.id != cell_id)) |> Enum.drop(1)
|> Enum.drop(1)
|> Enum.filter(&(&1.type == :elixir))
else
_ -> []
end
end end
end end

View file

@ -505,17 +505,17 @@ defmodule Livebook.Session do
start_evaluation(state, cell, section) start_evaluation(state, cell, section)
end end
defp handle_action(state, {:stop_evaluation, section}) do defp handle_action(state, {:stop_evaluation, _section}) do
if state.data.runtime do if state.data.runtime do
Runtime.drop_container(state.data.runtime, section.id) Runtime.drop_container(state.data.runtime, :main)
end end
state state
end end
defp handle_action(state, {:forget_evaluation, cell, section}) do defp handle_action(state, {:forget_evaluation, cell, _section}) do
if state.data.runtime do if state.data.runtime do
Runtime.forget_evaluation(state.data.runtime, section.id, cell.id) Runtime.forget_evaluation(state.data.runtime, :main, cell.id)
end end
state state
@ -539,14 +539,14 @@ defmodule Livebook.Session do
Phoenix.PubSub.broadcast(Livebook.PubSub, "sessions:#{session_id}", message) Phoenix.PubSub.broadcast(Livebook.PubSub, "sessions:#{session_id}", message)
end end
defp start_evaluation(state, cell, section) do defp start_evaluation(state, cell, _section) do
prev_ref = prev_ref =
case Notebook.parent_cells(state.data.notebook, cell.id) do case Notebook.parent_cells_with_section(state.data.notebook, cell.id) do
[parent | _] -> parent.id [{parent, _} | _] -> parent.id
[] -> :initial [] -> :initial
end end
Runtime.evaluate_code(state.data.runtime, cell.source, section.id, cell.id, prev_ref) Runtime.evaluate_code(state.data.runtime, cell.source, :main, cell.id, prev_ref)
state state
end end

View file

@ -192,14 +192,14 @@ defmodule Livebook.Session.Data do
:evaluating -> :evaluating ->
data data
|> with_actions() |> with_actions()
|> clear_section_evaluation(section) |> clear_evaluation()
|> add_action({:stop_evaluation, section}) |> add_action({:stop_evaluation, section})
:queued -> :queued ->
data data
|> with_actions() |> with_actions()
|> unqueue_cell_evaluation(cell, section) |> unqueue_cell_evaluation(cell, section)
|> unqueue_dependent_cells_evaluation(cell, section) |> unqueue_dependent_cells_evaluation(cell)
|> mark_dependent_cells_as_stale(cell) |> mark_dependent_cells_as_stale(cell)
_ -> _ ->
@ -233,7 +233,7 @@ defmodule Livebook.Session.Data do
:ready <- data.cell_infos[cell.id].evaluation_status do :ready <- data.cell_infos[cell.id].evaluation_status do
data data
|> with_actions() |> with_actions()
|> queue_prerequisite_cells_evaluation(cell, section) |> queue_prerequisite_cells_evaluation(cell)
|> queue_cell_evaluation(cell, section) |> queue_cell_evaluation(cell, section)
|> maybe_evaluate_queued() |> maybe_evaluate_queued()
|> wrap_ok() |> wrap_ok()
@ -275,7 +275,7 @@ defmodule Livebook.Session.Data do
:evaluating -> :evaluating ->
data data
|> with_actions() |> with_actions()
|> clear_section_evaluation(section) |> clear_evaluation()
|> add_action({:stop_evaluation, section}) |> add_action({:stop_evaluation, section})
|> wrap_ok() |> wrap_ok()
@ -283,7 +283,7 @@ defmodule Livebook.Session.Data do
data data
|> with_actions() |> with_actions()
|> unqueue_cell_evaluation(cell, section) |> unqueue_cell_evaluation(cell, section)
|> unqueue_dependent_cells_evaluation(cell, section) |> unqueue_dependent_cells_evaluation(cell)
|> mark_dependent_cells_as_stale(cell) |> mark_dependent_cells_as_stale(cell)
|> wrap_ok() |> wrap_ok()
@ -378,7 +378,7 @@ defmodule Livebook.Session.Data do
data data
|> with_actions() |> with_actions()
|> set!(runtime: runtime) |> set!(runtime: runtime)
|> reduce(data.notebook.sections, &clear_section_evaluation/2) |> clear_evaluation()
|> wrap_ok() |> wrap_ok()
end end
@ -448,30 +448,22 @@ defmodule Livebook.Session.Data do
new_idx = (idx + offset) |> clamp_index(section.cells) new_idx = (idx + offset) |> clamp_index(section.cells)
updated_notebook = Notebook.move_cell(data.notebook, section.id, idx, new_idx) updated_notebook = Notebook.move_cell(data.notebook, section.id, idx, new_idx)
{:ok, updated_section} = Notebook.fetch_section(updated_notebook, section.id)
elixir_cell_ids_before = elixir_cell_ids(section.cells) cells_with_section_before = Notebook.elixir_cells_with_section(data.notebook)
elixir_cell_ids_after = elixir_cell_ids(updated_section.cells) cells_with_section_after = Notebook.elixir_cells_with_section(updated_notebook)
# If the order of Elixir cells stays the same, no need to invalidate anything affected_cells_with_section =
affected_cells = cells_with_section_after
if elixir_cell_ids_before != elixir_cell_ids_after do |> Enum.zip(cells_with_section_before)
affected_from_idx = min(idx, new_idx) |> Enum.drop_while(fn {{cell_before, _}, {cell_after, _}} ->
Enum.slice(updated_section.cells, affected_from_idx..-1) cell_before.id == cell_after.id
else end)
[] |> Enum.map(fn {new, _old} -> new end)
end
data_actions data_actions
|> set!(notebook: updated_notebook) |> set!(notebook: updated_notebook)
|> mark_cells_as_stale(affected_cells) |> mark_cells_as_stale(affected_cells_with_section)
|> unqueue_cells_evaluation(affected_cells, section) |> unqueue_cells_evaluation(affected_cells_with_section)
end
defp elixir_cell_ids(cells) do
cells
|> Enum.filter(&(&1.type == :elixir))
|> Enum.map(& &1.id)
end end
defp clamp_index(index, list) do defp clamp_index(index, list) do
@ -534,13 +526,15 @@ defmodule Livebook.Session.Data do
end end
defp mark_dependent_cells_as_stale({data, _} = data_actions, cell) do defp mark_dependent_cells_as_stale({data, _} = data_actions, cell) do
child_cells = Notebook.child_cells(data.notebook, cell.id) child_cells = Notebook.child_cells_with_section(data.notebook, cell.id)
mark_cells_as_stale(data_actions, child_cells) mark_cells_as_stale(data_actions, child_cells)
end end
defp mark_cells_as_stale({data, _} = data_actions, cells) do defp mark_cells_as_stale({data, _} = data_actions, cells_with_section) do
invalidated_cells = invalidated_cells =
Enum.filter(cells, fn cell -> cells_with_section
|> Enum.map(fn {cell, _section} -> cell end)
|> Enum.filter(fn cell ->
cell.type == :elixir and data.cell_infos[cell.id].validity_status == :evaluated cell.type == :elixir and data.cell_infos[cell.id].validity_status == :evaluated
end) end)
@ -548,26 +542,39 @@ defmodule Livebook.Session.Data do
|> reduce(invalidated_cells, &set_cell_info!(&1, &2.id, validity_status: :stale)) |> reduce(invalidated_cells, &set_cell_info!(&1, &2.id, validity_status: :stale))
end end
# If there are idle sections with non-empty evaluation queue,
# the next queued cell for evaluation.
defp maybe_evaluate_queued({data, _} = data_actions) do defp maybe_evaluate_queued({data, _} = data_actions) do
Enum.reduce(data.notebook.sections, data_actions, fn section, data_actions -> ongoing_evaluation? =
{data, _} = data_actions Enum.any?(data.notebook.sections, fn section ->
data.section_infos[section.id].evaluating_cell_id != nil
end)
case data.section_infos[section.id] do if ongoing_evaluation? do
%{evaluating_cell_id: nil, evaluation_queue: [id | ids]} -> # A section is evaluating, so we don't start any new evaluation
cell = Enum.find(section.cells, &(&1.id == id)) data_actions
else
Enum.find_value(data.notebook.sections, data_actions, fn section ->
case data.section_infos[section.id] do
%{evaluating_cell_id: nil, evaluation_queue: [id | ids]} ->
# The section is idle and has cells queued for evaluation, so let's start the evaluation
cell = Enum.find(section.cells, &(&1.id == id))
data_actions data_actions
|> set!(notebook: Notebook.update_cell(data.notebook, id, &%{&1 | outputs: []})) |> set!(notebook: Notebook.update_cell(data.notebook, id, &%{&1 | outputs: []}))
|> set_cell_info!(id, evaluation_status: :evaluating) |> set_cell_info!(id, evaluation_status: :evaluating)
|> set_section_info!(section.id, evaluating_cell_id: id, evaluation_queue: ids) |> set_section_info!(section.id, evaluating_cell_id: id, evaluation_queue: ids)
|> add_action({:start_evaluation, cell, section}) |> add_action({:start_evaluation, cell, section})
_ -> _ ->
data_actions # The section is neither evaluating nor queued, so let's check the next section
end nil
end) end
end)
end
end
defp clear_evaluation({data, _} = data_actions) do
data_actions
|> reduce(data.notebook.sections, &clear_section_evaluation/2)
end end
defp clear_section_evaluation(data_actions, section) do defp clear_section_evaluation(data_actions, section) do
@ -579,31 +586,37 @@ defmodule Livebook.Session.Data do
) )
end end
defp queue_prerequisite_cells_evaluation({data, _} = data_actions, cell, section) do defp queue_prerequisite_cells_evaluation({data, _} = data_actions, cell) do
prerequisites_queue = prerequisites_queue =
data.notebook data.notebook
|> Notebook.parent_cells(cell.id) |> Notebook.parent_cells_with_section(cell.id)
|> Enum.take_while(fn parent -> |> Enum.take_while(fn {parent_cell, _section} ->
info = data.cell_infos[parent.id] info = data.cell_infos[parent_cell.id]
info.validity_status == :fresh and info.evaluation_status == :ready info.validity_status != :evaluated and info.evaluation_status == :ready
end) end)
|> Enum.reverse() |> Enum.reverse()
data_actions data_actions
|> reduce(prerequisites_queue, &queue_cell_evaluation(&1, &2, section)) |> reduce(prerequisites_queue, fn data_actions, {cell, section} ->
queue_cell_evaluation(data_actions, cell, section)
end)
end end
defp unqueue_dependent_cells_evaluation({data, _} = data_actions, cell, section) do defp unqueue_dependent_cells_evaluation({data, _} = data_actions, cell) do
dependent_cells = Notebook.child_cells(data.notebook, cell.id) dependent_cells = Notebook.child_cells_with_section(data.notebook, cell.id)
unqueue_cells_evaluation(data_actions, dependent_cells, section) unqueue_cells_evaluation(data_actions, dependent_cells)
end end
defp unqueue_cells_evaluation({data, _} = data_actions, cells, section) do defp unqueue_cells_evaluation({data, _} = data_actions, cells_with_section) do
queued_cells = queued_cells_with_section =
Enum.filter(cells, fn cell -> data.cell_infos[cell.id].evaluation_status == :queued end) Enum.filter(cells_with_section, fn {cell, _} ->
data.cell_infos[cell.id].evaluation_status == :queued
end)
data_actions data_actions
|> reduce(queued_cells, &unqueue_cell_evaluation(&1, &2, section)) |> reduce(queued_cells_with_section, fn data_actions, {cell, section} ->
unqueue_cell_evaluation(data_actions, cell, section)
end)
end end
defp set_notebook_name({data, _} = data_actions, name) do defp set_notebook_name({data, _} = data_actions, name) do

View file

@ -374,9 +374,7 @@ defmodule LivebookWeb.SessionLive do
socket.assigns.focused_cell_id socket.assigns.focused_cell_id
) )
cells = Notebook.child_cells(socket.assigns.data.notebook, cell.id) for {cell, _} <- Notebook.child_cells_with_section(socket.assigns.data.notebook, cell.id) do
for cell <- cells, cell.type == :elixir do
Session.queue_cell_evaluation(socket.assigns.session_id, cell.id) Session.queue_cell_evaluation(socket.assigns.session_id, cell.id)
end end
end end

View file

@ -309,6 +309,31 @@ defmodule Livebook.Session.DataTest do
}, []} = Data.apply_operation(data, operation) }, []} = Data.apply_operation(data, operation)
end end
test "marks relevant cells in further sections as stale" do
data =
data_after_operations!([
{:insert_section, self(), 0, "s1"},
{:insert_cell, self(), "s1", 0, :elixir, "c1"},
{:insert_cell, self(), "s1", 1, :elixir, "c2"},
{:insert_section, self(), 1, "s2"},
{:insert_cell, self(), "s2", 0, :elixir, "c3"},
# Evaluate cells
{:queue_cell_evaluation, self(), "c1"},
{:add_cell_evaluation_response, self(), "c1", {:ok, nil}},
{:queue_cell_evaluation, self(), "c2"},
{:add_cell_evaluation_response, self(), "c2", {:ok, nil}},
{:queue_cell_evaluation, self(), "c3"},
{:add_cell_evaluation_response, self(), "c3", {:ok, nil}}
])
operation = {:move_cell, self(), "c1", 1}
assert {:ok,
%{
cell_infos: %{"c3" => %{validity_status: :stale}}
}, []} = Data.apply_operation(data, operation)
end
test "moving a markdown cell does not change validity" do test "moving a markdown cell does not change validity" do
data = data =
data_after_operations!([ data_after_operations!([
@ -456,6 +481,75 @@ defmodule Livebook.Session.DataTest do
section_infos: %{"s1" => %{evaluating_cell_id: "c1", evaluation_queue: ["c2"]}} section_infos: %{"s1" => %{evaluating_cell_id: "c1", evaluation_queue: ["c2"]}}
}, []} = Data.apply_operation(data, operation) }, []} = Data.apply_operation(data, operation)
end end
test "marks the cell as queued if a previous section is already evaluating" do
data =
data_after_operations!([
{:insert_section, self(), 0, "s1"},
{:insert_cell, self(), "s1", 0, :elixir, "c1"},
{:insert_section, self(), 1, "s2"},
{:insert_cell, self(), "s2", 0, :elixir, "c2"},
{:queue_cell_evaluation, self(), "c1"}
])
operation = {:queue_cell_evaluation, self(), "c2"}
assert {:ok,
%{
cell_infos: %{"c2" => %{evaluation_status: :queued}},
section_infos: %{
"s1" => %{evaluating_cell_id: "c1", evaluation_queue: []},
"s2" => %{evaluating_cell_id: nil, evaluation_queue: ["c2"]}
}
}, []} = Data.apply_operation(data, operation)
end
test "queues previous unevaluated and stale cells" do
data =
data_after_operations!([
{:insert_section, self(), 0, "s1"},
{:insert_cell, self(), "s1", 0, :elixir, "c1"},
{:insert_cell, self(), "s1", 1, :elixir, "c2"},
{:insert_section, self(), 1, "s2"},
{:insert_cell, self(), "s2", 0, :elixir, "c3"},
{:insert_cell, self(), "s2", 1, :elixir, "c4"},
# Evaluate first 2 cells
{:queue_cell_evaluation, self(), "c1"},
{:add_cell_evaluation_response, self(), "c1", {:ok, [1, 2, 3]}},
{:queue_cell_evaluation, self(), "c2"},
{:add_cell_evaluation_response, self(), "c2", {:ok, [1, 2, 3]}},
# Evaluate the first cell, so the second becomes stale
{:queue_cell_evaluation, self(), "c1"},
{:add_cell_evaluation_response, self(), "c1", {:ok, [1, 2, 3]}}
])
# The above leads to:
#
# Section 1:
# * cell 1 - evaluated
# * cell 2 - stale
# Section 2:
# * cell 3 - fresh
# * cell 4 - fresh
#
# Queuing cell 4 should also queue cell 3 and cell 2,
# so that they all become evaluated.
operation = {:queue_cell_evaluation, self(), "c4"}
assert {:ok,
%{
cell_infos: %{
"c2" => %{evaluation_status: :evaluating},
"c3" => %{evaluation_status: :queued},
"c4" => %{evaluation_status: :queued}
},
section_infos: %{
"s1" => %{evaluating_cell_id: "c2", evaluation_queue: []},
"s2" => %{evaluating_cell_id: nil, evaluation_queue: ["c3", "c4"]}
}
}, _actions} = Data.apply_operation(data, operation)
end
end end
describe "apply_operation/2 given :add_cell_evaluation_stdout" do describe "apply_operation/2 given :add_cell_evaluation_stdout" do
@ -567,6 +661,31 @@ defmodule Livebook.Session.DataTest do
Data.apply_operation(data, operation) Data.apply_operation(data, operation)
end end
test "marks next queued cell in a further section as evaluating if there is one" do
data =
data_after_operations!([
{:insert_section, self(), 0, "s1"},
{:insert_cell, self(), "s1", 0, :elixir, "c1"},
{:insert_section, self(), 1, "s2"},
{:insert_cell, self(), "s2", 0, :elixir, "c2"},
{:queue_cell_evaluation, self(), "c1"},
{:queue_cell_evaluation, self(), "c2"}
])
operation = {:add_cell_evaluation_response, self(), "c1", {:ok, [1, 2, 3]}}
assert {:ok,
%{
cell_infos: %{"c2" => %{evaluation_status: :evaluating}},
section_infos: %{
"s1" => %{evaluating_cell_id: nil, evaluation_queue: []},
"s2" => %{evaluating_cell_id: "c2", evaluation_queue: []}
}
},
[{:start_evaluation, %{id: "c2"}, %{id: "s2"}}]} =
Data.apply_operation(data, operation)
end
test "if parent cells are not executed, marks them for evaluation first" do test "if parent cells are not executed, marks them for evaluation first" do
data = data =
data_after_operations!([ data_after_operations!([
@ -595,11 +714,15 @@ defmodule Livebook.Session.DataTest do
{:insert_section, self(), 0, "s1"}, {:insert_section, self(), 0, "s1"},
{:insert_cell, self(), "s1", 0, :elixir, "c1"}, {:insert_cell, self(), "s1", 0, :elixir, "c1"},
{:insert_cell, self(), "s1", 1, :elixir, "c2"}, {:insert_cell, self(), "s1", 1, :elixir, "c2"},
# Evaluate both cells {:insert_section, self(), 1, "s2"},
{:insert_cell, self(), "s2", 0, :elixir, "c3"},
# Evaluate all cells
{:queue_cell_evaluation, self(), "c1"}, {:queue_cell_evaluation, self(), "c1"},
{:add_cell_evaluation_response, self(), "c1", {:ok, [1, 2, 3]}}, {:add_cell_evaluation_response, self(), "c1", {:ok, [1, 2, 3]}},
{:queue_cell_evaluation, self(), "c2"}, {:queue_cell_evaluation, self(), "c2"},
{:add_cell_evaluation_response, self(), "c2", {:ok, [1, 2, 3]}}, {:add_cell_evaluation_response, self(), "c2", {:ok, [1, 2, 3]}},
{:queue_cell_evaluation, self(), "c3"},
{:add_cell_evaluation_response, self(), "c3", {:ok, [1, 2, 3]}},
# Queue the first cell again # Queue the first cell again
{:queue_cell_evaluation, self(), "c1"} {:queue_cell_evaluation, self(), "c1"}
]) ])
@ -608,7 +731,10 @@ defmodule Livebook.Session.DataTest do
assert {:ok, assert {:ok,
%{ %{
cell_infos: %{"c2" => %{validity_status: :stale}} cell_infos: %{
"c2" => %{validity_status: :stale},
"c3" => %{validity_status: :stale}
}
}, []} = Data.apply_operation(data, operation) }, []} = Data.apply_operation(data, operation)
end end
end end
@ -633,26 +759,32 @@ defmodule Livebook.Session.DataTest do
assert :error = Data.apply_operation(data, operation) assert :error = Data.apply_operation(data, operation)
end end
test "if the cell is evaluating, clears the corresponding section evaluation and the queue" do test "if the cell is evaluating, clears all sections evaluation and queues" do
data = data =
data_after_operations!([ data_after_operations!([
{:insert_section, self(), 0, "s1"}, {:insert_section, self(), 0, "s1"},
{:insert_cell, self(), "s1", 0, :elixir, "c1"}, {:insert_cell, self(), "s1", 0, :elixir, "c1"},
{:insert_cell, self(), "s1", 1, :elixir, "c2"}, {:insert_cell, self(), "s1", 1, :elixir, "c2"},
{:insert_section, self(), 1, "s2"},
{:insert_cell, self(), "s2", 0, :elixir, "c3"},
{:queue_cell_evaluation, self(), "c1"}, {:queue_cell_evaluation, self(), "c1"},
{:queue_cell_evaluation, self(), "c2"} {:add_cell_evaluation_response, self(), "c1", {:ok, [1, 2, 3]}},
{:queue_cell_evaluation, self(), "c2"},
{:queue_cell_evaluation, self(), "c3"}
]) ])
operation = {:cancel_cell_evaluation, self(), "c1"} operation = {:cancel_cell_evaluation, self(), "c2"}
assert {:ok, assert {:ok,
%{ %{
cell_infos: %{ cell_infos: %{
"c1" => %{validity_status: :fresh}, "c1" => %{validity_status: :fresh, evaluation_status: :ready},
"c2" => %{validity_status: :fresh} "c2" => %{validity_status: :fresh, evaluation_status: :ready},
"c3" => %{validity_status: :fresh, evaluation_status: :ready}
}, },
section_infos: %{ section_infos: %{
"s1" => %{evaluating_cell_id: nil, evaluation_queue: []} "s1" => %{evaluating_cell_id: nil, evaluation_queue: []},
"s2" => %{evaluating_cell_id: nil, evaluation_queue: []}
} }
}, _actions} = Data.apply_operation(data, operation) }, _actions} = Data.apply_operation(data, operation)
end end