Keep a separate evaluation queue per section and actually support concurrent evaluation

This commit is contained in:
Jonatan Kłosko 2021-01-12 14:41:05 +01:00
parent ada9f2e317
commit eb8b7480bb
2 changed files with 96 additions and 54 deletions

View file

@ -164,11 +164,7 @@ defmodule LiveBook.Session do
operation = {:queue_cell_evaluation, cell_id} operation = {:queue_cell_evaluation, cell_id}
handle_operation(state, operation, fn new_state -> handle_operation(state, operation, fn new_state ->
if state.data.status == :ready and new_state.data.status == :evaluating do maybe_trigger_evaluations(state, new_state)
{:noreply, trigger_evaluation(new_state)}
else
{:noreply, new_state}
end
end) end)
end end
@ -186,11 +182,7 @@ defmodule LiveBook.Session do
operation = {:add_cell_evaluation_response, cell_id, response} operation = {:add_cell_evaluation_response, cell_id, response}
handle_operation(state, operation, fn new_state -> handle_operation(state, operation, fn new_state ->
if new_state.data.status == :evaluating do maybe_trigger_evaluations(state, new_state)
{:noreply, trigger_evaluation(new_state)}
else
{:noreply, new_state}
end
end) end)
end end
@ -226,9 +218,29 @@ defmodule LiveBook.Session do
Phoenix.PubSub.broadcast(LiveBook.PubSub, "sessions:#{session_id}", message) Phoenix.PubSub.broadcast(LiveBook.PubSub, "sessions:#{session_id}", message)
end end
defp trigger_evaluation(state) do # Compares sections in the old and new state and if a new cell
# has been marked as evaluating it triggers the actual evaluation task.
defp maybe_trigger_evaluations(old_state, new_state) do
Enum.reduce(new_state.data.notebook.sections, new_state, fn section, state ->
case {Data.get_evaluating_cell_id(old_state.data, section.id),
Data.get_evaluating_cell_id(new_state.data, section.id)} do
{_, nil} ->
# No cell to evaluate
state
{cell_id, cell_id} ->
# The evaluating cell hasn't changed, so it must be already evaluating
state
{_, cell_id} ->
# The evaluating cell changed, so we trigger the evaluation to reflect that
trigger_evaluation(state, cell_id)
end
end)
end
defp trigger_evaluation(state, cell_id) do
notebook = state.data.notebook notebook = state.data.notebook
cell_id = Data.get_evaluating_cell_id(state.data)
{:ok, cell} = Notebook.fetch_cell(notebook, cell_id) {:ok, cell} = Notebook.fetch_cell(notebook, cell_id)
{:ok, section} = Notebook.fetch_cell_section(notebook, cell_id) {:ok, section} = Notebook.fetch_cell_section(notebook, cell_id)
{state, evaluator} = get_section_evaluator(state, section.id) {state, evaluator} = get_section_evaluator(state, section.id)

View file

@ -16,9 +16,7 @@ defmodule LiveBook.Session.Data do
defstruct [ defstruct [
:notebook, :notebook,
:status,
:path, :path,
:execution_queue,
:section_infos, :section_infos,
:cell_infos, :cell_infos,
:deleted_sections, :deleted_sections,
@ -30,18 +28,19 @@ defmodule LiveBook.Session.Data do
@type t :: %__MODULE__{ @type t :: %__MODULE__{
notebook: Notebook.t(), notebook: Notebook.t(),
status: status(),
path: nil | String.t(), path: nil | String.t(),
execution_queue: list(Cell.id()),
section_infos: %{Section.id() => section_info()}, section_infos: %{Section.id() => section_info()},
cell_infos: %{Cell.id() => cell_info()}, cell_infos: %{Cell.id() => cell_info()},
deleted_sections: list(Section.t()), deleted_sections: list(Section.t()),
deleted_cells: list(Cell.t()) deleted_cells: list(Cell.t())
} }
@type status :: :idle | :evaluating @type section_info :: %{
status: section_status(),
evaluation_queue: list(Cell.id())
}
@type section_info :: %{} @type section_status :: :idle | :evaluating
@type cell_info :: %{ @type cell_info :: %{
status: cell_status(), status: cell_status(),
@ -71,9 +70,7 @@ defmodule LiveBook.Session.Data do
def new() do def new() do
%__MODULE__{ %__MODULE__{
notebook: Notebook.new(), notebook: Notebook.new(),
status: :idle,
path: nil, path: nil,
execution_queue: [],
section_infos: %{}, section_infos: %{},
cell_infos: %{}, cell_infos: %{},
deleted_sections: [], deleted_sections: [],
@ -90,7 +87,7 @@ defmodule LiveBook.Session.Data do
An operation only applies changes to the structure, but it doesn't trigger An operation only applies changes to the structure, but it doesn't trigger
any actual processing. It's the responsibility of the session process to ensure any actual processing. It's the responsibility of the session process to ensure
the system reflects the new structure. For instance, when the status the system reflects the new structure. For instance, when section status
changes from `:idle` to `:evaluating`, the session process should take changes from `:idle` to `:evaluating`, the session process should take
care of evaluating the appropriate cell. care of evaluating the appropriate cell.
@ -124,16 +121,20 @@ defmodule LiveBook.Session.Data do
end end
def apply_operation(data, {:delete_section, id}) do def apply_operation(data, {:delete_section, id}) do
with {:ok, section} <- Notebook.fetch_section(data.notebook, id) do with {:ok, section} <- Notebook.fetch_section(data.notebook, id),
# If a cell within this section is being evaluated, it should be cancelled first
false <- data.section_infos[section.id].status == :evaluating do
data data
|> delete_section(section) |> delete_section(section)
|> wrap_ok() |> wrap_ok()
else
_ -> :error
end end
end end
def apply_operation(data, {:delete_cell, id}) do def apply_operation(data, {:delete_cell, id}) do
with {:ok, cell} <- Notebook.fetch_cell(data.notebook, id), with {:ok, cell} <- Notebook.fetch_cell(data.notebook, id),
# If the cell is being evaluated, it should be cancelled first. # If the cell is being evaluated, it should be cancelled first
false <- data.cell_infos[cell.id].status == :evaluating do false <- data.cell_infos[cell.id].status == :evaluating do
data data
|> delete_cell(cell) |> delete_cell(cell)
@ -145,7 +146,7 @@ defmodule LiveBook.Session.Data do
def apply_operation(data, {:queue_cell_evaluation, id}) do def apply_operation(data, {:queue_cell_evaluation, id}) do
with {:ok, cell} <- Notebook.fetch_cell(data.notebook, id), with {:ok, cell} <- Notebook.fetch_cell(data.notebook, id),
false <- id in data.evaluation_queue do false <- data.cell_infos[cell.id].status in [:queued, :evaluating] do
data data
|> queue_cell_evaluation(cell) |> queue_cell_evaluation(cell)
|> wrap_ok() |> wrap_ok()
@ -170,6 +171,8 @@ defmodule LiveBook.Session.Data do
end end
end end
# === Actual implementation of each operation ===
#
# The above definitions validate data, so the implementations # The above definitions validate data, so the implementations
# below are focused on making the proper changes. # below are focused on making the proper changes.
@ -196,7 +199,7 @@ defmodule LiveBook.Session.Data do
section_infos: Map.delete(data.section_infos, section.id), section_infos: Map.delete(data.section_infos, section.id),
deleted_sections: [section | data.deleted_sections] deleted_sections: [section | data.deleted_sections]
) )
|> reduce(section.cells, &clear_cell_info_and_evaluation_queue/2) |> reduce(section.cells, &delete_cell_info/2)
end end
defp delete_cell(data, cell) do defp delete_cell(data, cell) do
@ -205,18 +208,17 @@ defmodule LiveBook.Session.Data do
notebook: Notebook.delete_cell(data.notebook, cell.id), notebook: Notebook.delete_cell(data.notebook, cell.id),
deleted_cells: [cell | data.deleted_cells] deleted_cells: [cell | data.deleted_cells]
) )
|> clear_cell_info_and_evaluation_queue(cell) |> delete_cell_info(cell)
end end
defp clear_cell_info_and_evaluation_queue(data, cell) do defp delete_cell_info(data, cell) do
data data
|> set!( |> set!(cell_infos: Map.delete(data.cell_infos, cell.id))
evaluation_queue: List.delete(data.evaluation_queue, cell.id),
cell_infos: Map.delete(data.cell_infos, cell.id)
)
end end
defp queue_cell_evaluation(data, cell) do defp queue_cell_evaluation(data, cell) do
{:ok, section} = Notebook.fetch_cell_section(data.notebook, cell.id)
fresh_parent_cells = fresh_parent_cells =
data.notebook data.notebook
|> Notebook.parent_cells(cell.id) |> Notebook.parent_cells(cell.id)
@ -224,7 +226,9 @@ defmodule LiveBook.Session.Data do
data data
|> reduce(fresh_parent_cells, &queue_cell_evaluation/2) |> reduce(fresh_parent_cells, &queue_cell_evaluation/2)
|> set!(evaluation_queue: data.evaluation_queue ++ [cell.id]) |> set_section_info!(section.id,
evaluation_queue: data.section_infos[section.id].evaluation_queue ++ [cell.id]
)
|> set_cell_info!(cell.id, status: :queued) |> set_cell_info!(cell.id, status: :queued)
|> maybe_evaluate_queued() |> maybe_evaluate_queued()
end end
@ -238,6 +242,8 @@ defmodule LiveBook.Session.Data do
end end
defp add_cell_evaluation_response(data, cell, response) do defp add_cell_evaluation_response(data, cell, response) do
{:ok, section} = Notebook.fetch_cell_section(data.notebook, cell.id)
child_cell_ids = child_cell_ids =
data.notebook data.notebook
|> Notebook.child_cells(cell.id) |> Notebook.child_cells(cell.id)
@ -245,35 +251,41 @@ defmodule LiveBook.Session.Data do
data data
|> set!( |> set!(
status: :ready,
# TODO: add result to outputs # TODO: add result to outputs
notebook: data.notebook notebook: data.notebook
) )
|> set_cell_info!(cell.id, status: :evaluated) |> set_cell_info!(cell.id, status: :evaluated)
|> set_cell_infos!(child_cell_ids, status: :stale) |> set_cell_infos!(child_cell_ids, status: :stale)
|> set_section_info!(section.id, status: :idle)
|> maybe_evaluate_queued() |> maybe_evaluate_queued()
end end
# --- # ===
defp maybe_evaluate_queued(%{status: :ready, evaluation_queue: [id | ids]} = data) do # If there are idle sections with non-empty evaluation queue,
data # the function marks the section and they appropriate cell for evaluation.
|> set!( defp maybe_evaluate_queued(data) do
status: :evaluating, Enum.reduce(data.notebook.sections, data, fn section, data ->
evaluation_queue: ids, case data.section_infos[section.id] do
notebook: Notebook.update_cell(data.notebook, id, &%{&1 | outputs: []}) %{status: :idle, evaluation_queue: [id | ids]} ->
) data
|> set_cell_info!(id, status: :evaluating) |> set!(notebook: Notebook.update_cell(data.notebook, id, &%{&1 | outputs: []}))
|> set_cell_info!(id, status: :evaluating)
|> set_section_info!(section.id, status: :evaluating, evaluation_queue: ids)
_ ->
data
end
end)
end end
defp maybe_evaluate_queued(data), do: data
defp wrap_ok(value), do: {:ok, value} defp wrap_ok(value), do: {:ok, value}
# ---
defp new_section_info() do defp new_section_info() do
%{} %{
status: :idle,
evaluation_queue: []
}
end end
defp new_cell_info() do defp new_cell_info() do
@ -306,19 +318,37 @@ defmodule LiveBook.Session.Data do
Enum.reduce(cell_ids, data, &set_cell_info!(&2, &1, changes)) Enum.reduce(cell_ids, data, &set_cell_info!(&2, &1, changes))
end end
defp set_section_info!(data, section_id, changes) do
section_infos =
Map.update!(data.section_infos, section_id, fn info ->
Enum.reduce(changes, info, fn {key, value}, info ->
Map.replace!(info, key, value)
end)
end)
set!(data, section_infos: section_infos)
end
defp reduce(data, list, reducer) do defp reduce(data, list, reducer) do
Enum.reduce(list, data, fn elem, data -> reducer.(data, elem) end) Enum.reduce(list, data, fn elem, data -> reducer.(data, elem) end)
end end
@doc """ @doc """
Finds the cell that's currently being evaluated. Finds the cell that's currently being evaluated in the given section.
""" """
@spec get_evaluating_cell_id(t()) :: Cell.id() | nil @spec get_evaluating_cell_id(t(), Section.id()) :: Cell.id() | nil
def get_evaluating_cell_id(data) do def get_evaluating_cell_id(data, section_id) do
Enum.find(data.cell_infos, fn {_, info} -> info.status == :evaluating end) case Notebook.fetch_section(data.notebook, section_id) do
|> case do {:ok, section} ->
nil -> nil section.cells
{cell_id, _} -> cell_id |> Enum.find(fn cell -> data.cell_infos[cell.id].status == :evaluating end)
|> case do
nil -> nil
cell -> cell.id
end
:error ->
nil
end end
end end
end end