mirror of
https://github.com/livebook-dev/livebook.git
synced 2024-12-26 09:22:00 +08:00
Implement evaluation cancellation (#7)
* Implement evaluation cancellation * Forger cell evaluation on deletion * Further operation fixes * Implement new side effects approach
This commit is contained in:
parent
00b06f6e7a
commit
8beeb48d1b
4 changed files with 353 additions and 139 deletions
|
@ -96,6 +96,14 @@ defmodule LiveBook.Session do
|
|||
GenServer.cast(name(session_id), {:queue_cell_evaluation, cell_id})
|
||||
end
|
||||
|
||||
@doc """
|
||||
Asynchronously sends cell evaluation cancellation request to the server.
|
||||
"""
|
||||
@spec cancel_cell_evaluation(id(), Cell.id()) :: :ok
|
||||
def cancel_cell_evaluation(session_id, cell_id) do
|
||||
GenServer.cast(name(session_id), {:cancel_cell_evaluation, cell_id})
|
||||
end
|
||||
|
||||
@doc """
|
||||
Synchronously stops the server.
|
||||
"""
|
||||
|
@ -137,10 +145,7 @@ defmodule LiveBook.Session do
|
|||
|
||||
def handle_cast({:delete_section, section_id}, state) do
|
||||
operation = {:delete_section, section_id}
|
||||
|
||||
handle_operation(state, operation, fn new_state ->
|
||||
delete_section_evaluator(new_state, section_id)
|
||||
end)
|
||||
handle_operation(state, operation)
|
||||
end
|
||||
|
||||
def handle_cast({:delete_cell, cell_id}, state) do
|
||||
|
@ -150,10 +155,12 @@ defmodule LiveBook.Session do
|
|||
|
||||
def handle_cast({:queue_cell_evaluation, cell_id}, state) do
|
||||
operation = {:queue_cell_evaluation, cell_id}
|
||||
handle_operation(state, operation)
|
||||
end
|
||||
|
||||
handle_operation(state, operation, fn new_state ->
|
||||
maybe_trigger_evaluations(state, new_state)
|
||||
end)
|
||||
def handle_cast({:cancel_cell_evaluation, cell_id}, state) do
|
||||
operation = {:cancel_cell_evaluation, cell_id}
|
||||
handle_operation(state, operation)
|
||||
end
|
||||
|
||||
@impl true
|
||||
|
@ -168,10 +175,7 @@ defmodule LiveBook.Session do
|
|||
|
||||
def handle_info({:evaluator_response, cell_id, response}, state) do
|
||||
operation = {:add_cell_evaluation_response, cell_id, response}
|
||||
|
||||
handle_operation(state, operation, fn new_state ->
|
||||
maybe_trigger_evaluations(state, new_state)
|
||||
end)
|
||||
handle_operation(state, operation)
|
||||
end
|
||||
|
||||
# ---
|
||||
|
@ -181,71 +185,67 @@ defmodule LiveBook.Session do
|
|||
# * broadcasts the operation to all clients immediately,
|
||||
# so that they can update their local `Data`
|
||||
# * applies the operation to own local `Data`
|
||||
# * optionally performs a relevant task (e.g. starts cell evaluation),
|
||||
# * if necessary, performs the relevant tasks (e.g. starts cell evaluation),
|
||||
# to reflect the new `Data`
|
||||
#
|
||||
defp handle_operation(state, operation) do
|
||||
handle_operation(state, operation, fn state -> state end)
|
||||
end
|
||||
|
||||
defp handle_operation(state, operation, handle_new_state) do
|
||||
broadcast_operation(state.session_id, operation)
|
||||
|
||||
case Data.apply_operation(state.data, operation) do
|
||||
{:ok, new_data} ->
|
||||
{:ok, new_data, actions} ->
|
||||
new_state = %{state | data: new_data}
|
||||
{:noreply, handle_new_state.(new_state)}
|
||||
{:noreply, handle_actions(new_state, actions)}
|
||||
|
||||
:error ->
|
||||
{:noreply, state}
|
||||
end
|
||||
end
|
||||
|
||||
defp handle_actions(state, actions) do
|
||||
Enum.reduce(actions, state, &handle_action(&2, &1))
|
||||
end
|
||||
|
||||
defp handle_action(state, {:start_evaluation, cell, section}) do
|
||||
trigger_evaluation(state, cell, section)
|
||||
end
|
||||
|
||||
defp handle_action(state, {:stop_evaluation, section}) do
|
||||
delete_section_evaluator(state, section.id)
|
||||
end
|
||||
|
||||
defp handle_action(state, {:forget_evaluation, cell, section}) do
|
||||
with {:ok, evaluator} <- fetch_section_evaluator(state, section.id) do
|
||||
Evaluator.forget_evaluation(evaluator, cell.id)
|
||||
end
|
||||
|
||||
state
|
||||
end
|
||||
|
||||
defp broadcast_operation(session_id, operation) do
|
||||
message = {:operation, operation}
|
||||
Phoenix.PubSub.broadcast(LiveBook.PubSub, "sessions:#{session_id}", message)
|
||||
end
|
||||
|
||||
# Compares sections in the old and new state and if a new cell
|
||||
# has been marked as evaluating it triggers the actual evaluation task.
|
||||
defp maybe_trigger_evaluations(old_state, new_state) do
|
||||
Enum.reduce(new_state.data.notebook.sections, new_state, fn section, state ->
|
||||
case {Data.get_evaluating_cell_id(old_state.data, section.id),
|
||||
Data.get_evaluating_cell_id(new_state.data, section.id)} do
|
||||
{_, nil} ->
|
||||
# No cell to evaluate
|
||||
state
|
||||
|
||||
{cell_id, cell_id} ->
|
||||
# The evaluating cell hasn't changed, so it must be already evaluating
|
||||
state
|
||||
|
||||
{_, cell_id} ->
|
||||
# The evaluating cell changed, so we trigger the evaluation to reflect that
|
||||
trigger_evaluation(state, cell_id)
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp trigger_evaluation(state, cell_id) do
|
||||
notebook = state.data.notebook
|
||||
{:ok, cell, section} = Notebook.fetch_cell_and_section(notebook, cell_id)
|
||||
defp trigger_evaluation(state, cell, section) do
|
||||
{state, evaluator} = get_section_evaluator(state, section.id)
|
||||
%{source: source} = cell
|
||||
|
||||
prev_ref =
|
||||
case Notebook.parent_cells(notebook, cell_id) do
|
||||
case Notebook.parent_cells(state.data.notebook, cell.id) do
|
||||
[parent | _] -> parent.id
|
||||
[] -> :initial
|
||||
end
|
||||
|
||||
Evaluator.evaluate_code(evaluator, self(), source, cell_id, prev_ref)
|
||||
Evaluator.evaluate_code(evaluator, self(), cell.source, cell.id, prev_ref)
|
||||
|
||||
state
|
||||
end
|
||||
|
||||
defp fetch_section_evaluator(state, section_id) do
|
||||
Map.fetch(state.evaluators, section_id)
|
||||
end
|
||||
|
||||
defp get_section_evaluator(state, section_id) do
|
||||
case Map.fetch(state.evaluators, section_id) do
|
||||
case fetch_section_evaluator(state, section_id) do
|
||||
{:ok, evaluator} ->
|
||||
{state, evaluator}
|
||||
|
||||
|
@ -257,7 +257,7 @@ defmodule LiveBook.Session do
|
|||
end
|
||||
|
||||
defp delete_section_evaluator(state, section_id) do
|
||||
case Map.fetch(state.evaluators, section_id) do
|
||||
case fetch_section_evaluator(state, section_id) do
|
||||
{:ok, evaluator} ->
|
||||
EvaluatorSupervisor.terminate_evaluator(evaluator)
|
||||
%{state | evaluators: Map.delete(state.evaluators, section_id)}
|
||||
|
|
|
@ -60,6 +60,12 @@ defmodule LiveBook.Session.Data do
|
|||
| {:queue_cell_evaluation, Cell.id()}
|
||||
| {:add_cell_evaluation_stdout, Cell.id(), String.t()}
|
||||
| {:add_cell_evaluation_response, Cell.id(), Evaluator.evaluation_response()}
|
||||
| {:cancel_cell_evaluation, Cell.id()}
|
||||
|
||||
@type action ::
|
||||
{:start_evaluation, Cell.t(), Section.t()}
|
||||
| {:stop_evaluation, Section.t()}
|
||||
| {:forget_evaluation, Cell.t(), Section.t()}
|
||||
|
||||
@doc """
|
||||
Returns a fresh notebook session state.
|
||||
|
@ -88,21 +94,25 @@ defmodule LiveBook.Session.Data do
|
|||
the system reflects the new structure. For instance, when a new cell is marked
|
||||
as evaluating, the session process should take care of triggering actual evaluation.
|
||||
|
||||
Returns `{:ok, data}` on correct application or `:error` if the operation
|
||||
is not valid. The `:error` is generally expected given the collaborative
|
||||
nature of sessions. For example if there are simultaneous deletion
|
||||
and evaluation operations on the same cell, we may perform delete first,
|
||||
Returns `{:ok, data, actions}` if the operation is valid, where `data` is the result
|
||||
of applying said operation to the given data, and `actions` is a list
|
||||
of side effects that should be performed for the new data to hold true.
|
||||
|
||||
Returns `:error` if the operation is not valid. The `:error` is generally
|
||||
expected given the collaborative nature of sessions. For example if there are
|
||||
simultaneous deletion and evaluation operations on the same cell, we may perform delete first,
|
||||
in which case the evaluation is no longer valid (there's no cell with the given id).
|
||||
By returning `:error` we simply notify the caller that no changes were applied,
|
||||
so any related actions can be ignored.
|
||||
"""
|
||||
@spec apply_operation(t(), operation()) :: {:ok, t()} | :error
|
||||
@spec apply_operation(t(), operation()) :: {:ok, t(), list(action())} | :error
|
||||
def apply_operation(data, operation)
|
||||
|
||||
def apply_operation(data, {:insert_section, index, id}) do
|
||||
section = %{Section.new() | id: id}
|
||||
|
||||
data
|
||||
|> with_actions()
|
||||
|> insert_section(index, section)
|
||||
|> wrap_ok()
|
||||
end
|
||||
|
@ -112,34 +122,44 @@ defmodule LiveBook.Session.Data do
|
|||
cell = %{Cell.new(type) | id: id}
|
||||
|
||||
data
|
||||
|> with_actions()
|
||||
|> insert_cell(section_id, index, cell)
|
||||
|> wrap_ok()
|
||||
end
|
||||
end
|
||||
|
||||
def apply_operation(data, {:delete_section, id}) do
|
||||
with {:ok, section} <- Notebook.fetch_section(data.notebook, id),
|
||||
# If a cell within this section is being evaluated, it should be cancelled first
|
||||
nil <- data.section_infos[section.id].evaluating_cell_id do
|
||||
with {:ok, section} <- Notebook.fetch_section(data.notebook, id) do
|
||||
data
|
||||
|> with_actions()
|
||||
|> delete_section(section)
|
||||
|> wrap_ok()
|
||||
else
|
||||
_ -> :error
|
||||
end
|
||||
end
|
||||
|
||||
def apply_operation(data, {:delete_cell, id}) do
|
||||
with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id),
|
||||
# If the cell is being evaluated, it should be cancelled first
|
||||
false <- data.cell_infos[cell.id].status == :evaluating do
|
||||
data
|
||||
|> unqueue_cell_evaluation_if_any(cell, section)
|
||||
|> mark_dependent_cells_as_stale(cell)
|
||||
with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id) do
|
||||
case data.cell_infos[cell.id].status do
|
||||
:evaluating ->
|
||||
data
|
||||
|> with_actions()
|
||||
|> clear_section_evaluation(section)
|
||||
|
||||
:queued ->
|
||||
data
|
||||
|> with_actions()
|
||||
|> unqueue_cell_evaluation(cell, section)
|
||||
|> unqueue_dependent_cells_evaluation(cell, section)
|
||||
|> mark_dependent_cells_as_stale(cell)
|
||||
|
||||
_ ->
|
||||
data
|
||||
|> with_actions()
|
||||
|> mark_dependent_cells_as_stale(cell)
|
||||
end
|
||||
|> delete_cell(cell)
|
||||
|> add_action({:forget_evaluation, cell, section})
|
||||
|> wrap_ok()
|
||||
else
|
||||
_ -> :error
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -147,10 +167,9 @@ defmodule LiveBook.Session.Data do
|
|||
with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id),
|
||||
:elixir <- cell.type,
|
||||
false <- data.cell_infos[cell.id].status in [:queued, :evaluating] do
|
||||
prerequisites_queue = fresh_parent_cells_queue(data, cell)
|
||||
|
||||
data
|
||||
|> reduce(prerequisites_queue, &queue_cell_evaluation(&1, &2, section))
|
||||
|> with_actions()
|
||||
|> queue_prerequisite_cells_evaluation(cell, section)
|
||||
|> queue_cell_evaluation(cell, section)
|
||||
|> maybe_evaluate_queued()
|
||||
|> wrap_ok()
|
||||
|
@ -162,6 +181,7 @@ defmodule LiveBook.Session.Data do
|
|||
def apply_operation(data, {:add_cell_evaluation_stdout, id, string}) do
|
||||
with {:ok, cell, _} <- Notebook.fetch_cell_and_section(data.notebook, id) do
|
||||
data
|
||||
|> with_actions()
|
||||
|> add_cell_evaluation_stdout(cell, string)
|
||||
|> wrap_ok()
|
||||
end
|
||||
|
@ -170,6 +190,7 @@ defmodule LiveBook.Session.Data do
|
|||
def apply_operation(data, {:add_cell_evaluation_response, id, response}) do
|
||||
with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id) do
|
||||
data
|
||||
|> with_actions()
|
||||
|> add_cell_evaluation_response(cell, response)
|
||||
|> finish_cell_evaluation(cell, section)
|
||||
|> mark_dependent_cells_as_stale(cell)
|
||||
|
@ -178,26 +199,53 @@ defmodule LiveBook.Session.Data do
|
|||
end
|
||||
end
|
||||
|
||||
def apply_operation(data, {:cancel_cell_evaluation, id}) do
|
||||
with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id) do
|
||||
case data.cell_infos[cell.id].status do
|
||||
:evaluating ->
|
||||
data
|
||||
|> with_actions()
|
||||
|> clear_section_evaluation(section)
|
||||
|> wrap_ok()
|
||||
|
||||
:queued ->
|
||||
data
|
||||
|> with_actions()
|
||||
|> unqueue_cell_evaluation(cell, section)
|
||||
|> unqueue_dependent_cells_evaluation(cell, section)
|
||||
|> mark_dependent_cells_as_stale(cell)
|
||||
|> wrap_ok()
|
||||
|
||||
_ ->
|
||||
:error
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# ===
|
||||
|
||||
defp insert_section(data, index, section) do
|
||||
data
|
||||
defp with_actions(data, actions \\ []), do: {data, actions}
|
||||
|
||||
defp wrap_ok({data, actions}), do: {:ok, data, actions}
|
||||
|
||||
defp insert_section({data, _} = data_actions, index, section) do
|
||||
data_actions
|
||||
|> set!(
|
||||
notebook: Notebook.insert_section(data.notebook, index, section),
|
||||
section_infos: Map.put(data.section_infos, section.id, new_section_info())
|
||||
)
|
||||
end
|
||||
|
||||
defp insert_cell(data, section_id, index, cell) do
|
||||
data
|
||||
defp insert_cell({data, _} = data_actions, section_id, index, cell) do
|
||||
data_actions
|
||||
|> set!(
|
||||
notebook: Notebook.insert_cell(data.notebook, section_id, index, cell),
|
||||
cell_infos: Map.put(data.cell_infos, cell.id, new_cell_info())
|
||||
)
|
||||
end
|
||||
|
||||
defp delete_section(data, section) do
|
||||
data
|
||||
defp delete_section({data, _} = data_actions, section) do
|
||||
data_actions
|
||||
|> set!(
|
||||
notebook: Notebook.delete_section(data.notebook, section.id),
|
||||
section_infos: Map.delete(data.section_infos, section.id),
|
||||
|
@ -206,8 +254,8 @@ defmodule LiveBook.Session.Data do
|
|||
|> reduce(section.cells, &delete_cell_info/2)
|
||||
end
|
||||
|
||||
defp delete_cell(data, cell) do
|
||||
data
|
||||
defp delete_cell({data, _} = data_actions, cell) do
|
||||
data_actions
|
||||
|> set!(
|
||||
notebook: Notebook.delete_cell(data.notebook, cell.id),
|
||||
deleted_cells: [cell | data.deleted_cells]
|
||||
|
@ -215,88 +263,115 @@ defmodule LiveBook.Session.Data do
|
|||
|> delete_cell_info(cell)
|
||||
end
|
||||
|
||||
defp delete_cell_info(data, cell) do
|
||||
data
|
||||
defp delete_cell_info({data, _} = data_actions, cell) do
|
||||
data_actions
|
||||
|> set!(cell_infos: Map.delete(data.cell_infos, cell.id))
|
||||
end
|
||||
|
||||
defp queue_cell_evaluation(data, cell, section) do
|
||||
data
|
||||
defp queue_cell_evaluation(data_actions, cell, section) do
|
||||
data_actions
|
||||
|> update_section_info!(section.id, fn section ->
|
||||
%{section | evaluation_queue: section.evaluation_queue ++ [cell.id]}
|
||||
end)
|
||||
|> set_cell_info!(cell.id, status: :queued)
|
||||
end
|
||||
|
||||
defp unqueue_cell_evaluation_if_any(data, cell, section) do
|
||||
data
|
||||
defp unqueue_cell_evaluation(data_actions, cell, section) do
|
||||
data_actions
|
||||
|> update_section_info!(section.id, fn section ->
|
||||
%{section | evaluation_queue: List.delete(section.evaluation_queue, cell.id)}
|
||||
end)
|
||||
|> set_cell_info!(cell.id, status: :stale)
|
||||
end
|
||||
|
||||
defp add_cell_evaluation_stdout(data, _cell, _string) do
|
||||
data
|
||||
defp add_cell_evaluation_stdout({data, _} = data_actions, _cell, _string) do
|
||||
data_actions
|
||||
|> set!(
|
||||
# TODO: add stdout to cell outputs
|
||||
notebook: data.notebook
|
||||
)
|
||||
end
|
||||
|
||||
defp add_cell_evaluation_response(data, _cell, _response) do
|
||||
data
|
||||
defp add_cell_evaluation_response({data, _} = data_actions, _cell, _response) do
|
||||
data_actions
|
||||
|> set!(
|
||||
# TODO: add result to outputs
|
||||
notebook: data.notebook
|
||||
)
|
||||
end
|
||||
|
||||
defp finish_cell_evaluation(data, cell, section) do
|
||||
data
|
||||
defp finish_cell_evaluation(data_actions, cell, section) do
|
||||
data_actions
|
||||
|> set_cell_info!(cell.id, status: :evaluated, evaluated_at: DateTime.utc_now())
|
||||
|> set_section_info!(section.id, evaluating_cell_id: nil)
|
||||
end
|
||||
|
||||
defp mark_dependent_cells_as_stale(data, cell) do
|
||||
invalidated_cells = evaluated_child_cells(data, cell)
|
||||
defp mark_dependent_cells_as_stale({data, _} = data_actions, cell) do
|
||||
invalidated_cells = child_cells_with_status(data, cell, :evaluated)
|
||||
|
||||
data
|
||||
data_actions
|
||||
|> reduce(invalidated_cells, &set_cell_info!(&1, &2.id, status: :stale))
|
||||
end
|
||||
|
||||
defp fresh_parent_cells_queue(data, cell) do
|
||||
data.notebook
|
||||
|> Notebook.parent_cells(cell.id)
|
||||
|> Enum.filter(fn parent -> data.cell_infos[parent.id].status == :fresh end)
|
||||
|> Enum.take_while(fn parent -> data.cell_infos[parent.id].status == :fresh end)
|
||||
|> Enum.reverse()
|
||||
end
|
||||
|
||||
defp evaluated_child_cells(data, cell) do
|
||||
defp child_cells_with_status(data, cell, status) do
|
||||
data.notebook
|
||||
|> Notebook.child_cells(cell.id)
|
||||
# Mark only evaluted cells as stale
|
||||
|> Enum.filter(fn cell -> data.cell_infos[cell.id].status == :evaluated end)
|
||||
|> Enum.filter(fn cell -> data.cell_infos[cell.id].status == status end)
|
||||
end
|
||||
|
||||
# If there are idle sections with non-empty evaluation queue,
|
||||
# the next queued cell for evaluation.
|
||||
defp maybe_evaluate_queued(data) do
|
||||
Enum.reduce(data.notebook.sections, data, fn section, data ->
|
||||
defp maybe_evaluate_queued({data, _} = data_actions) do
|
||||
Enum.reduce(data.notebook.sections, data_actions, fn section, data_actions ->
|
||||
{data, _} = data_actions
|
||||
|
||||
case data.section_infos[section.id] do
|
||||
%{evaluating_cell_id: nil, evaluation_queue: [id | ids]} ->
|
||||
data
|
||||
cell = Enum.find(section.cells, &(&1.id == id))
|
||||
|
||||
data_actions
|
||||
|> set!(notebook: Notebook.update_cell(data.notebook, id, &%{&1 | outputs: []}))
|
||||
|> set_cell_info!(id, status: :evaluating)
|
||||
|> set_section_info!(section.id, evaluating_cell_id: id, evaluation_queue: ids)
|
||||
|> add_action({:start_evaluation, cell, section})
|
||||
|
||||
_ ->
|
||||
data
|
||||
data_actions
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp wrap_ok(value), do: {:ok, value}
|
||||
defp clear_section_evaluation(data_actions, section) do
|
||||
data_actions
|
||||
|> set_section_info!(section.id, evaluating_cell_id: nil, evaluation_queue: [])
|
||||
|> reduce(section.cells, &set_cell_info!(&1, &2.id, status: :fresh))
|
||||
|> add_action({:stop_evaluation, section})
|
||||
end
|
||||
|
||||
defp queue_prerequisite_cells_evaluation({data, _} = data_actions, cell, section) do
|
||||
prerequisites_queue = fresh_parent_cells_queue(data, cell)
|
||||
|
||||
data_actions
|
||||
|> reduce(prerequisites_queue, &queue_cell_evaluation(&1, &2, section))
|
||||
end
|
||||
|
||||
defp unqueue_dependent_cells_evaluation({data, _} = data_actions, cell, section) do
|
||||
queued_dependent_cells = child_cells_with_status(data, cell, :queued)
|
||||
|
||||
data_actions
|
||||
|> reduce(queued_dependent_cells, &unqueue_cell_evaluation(&1, &2, section))
|
||||
end
|
||||
|
||||
defp add_action({data, actions}, action) do
|
||||
{data, actions ++ [action]}
|
||||
end
|
||||
|
||||
defp new_section_info() do
|
||||
%{
|
||||
|
@ -314,40 +389,41 @@ defmodule LiveBook.Session.Data do
|
|||
}
|
||||
end
|
||||
|
||||
defp set!(data, changes) do
|
||||
defp set!({data, actions}, changes) do
|
||||
Enum.reduce(changes, data, fn {key, value}, info ->
|
||||
Map.replace!(info, key, value)
|
||||
end)
|
||||
|> with_actions(actions)
|
||||
end
|
||||
|
||||
defp set_cell_info!(data, cell_id, changes) do
|
||||
update_cell_info!(data, cell_id, fn info ->
|
||||
defp set_cell_info!(data_actions, cell_id, changes) do
|
||||
update_cell_info!(data_actions, cell_id, fn info ->
|
||||
Enum.reduce(changes, info, fn {key, value}, info ->
|
||||
Map.replace!(info, key, value)
|
||||
end)
|
||||
end)
|
||||
end
|
||||
|
||||
defp update_cell_info!(data, cell_id, fun) do
|
||||
defp update_cell_info!({data, _} = data_actions, cell_id, fun) do
|
||||
cell_infos = Map.update!(data.cell_infos, cell_id, fun)
|
||||
set!(data, cell_infos: cell_infos)
|
||||
set!(data_actions, cell_infos: cell_infos)
|
||||
end
|
||||
|
||||
defp set_section_info!(data, section_id, changes) do
|
||||
update_section_info!(data, section_id, fn info ->
|
||||
defp set_section_info!(data_actions, section_id, changes) do
|
||||
update_section_info!(data_actions, section_id, fn info ->
|
||||
Enum.reduce(changes, info, fn {key, value}, info ->
|
||||
Map.replace!(info, key, value)
|
||||
end)
|
||||
end)
|
||||
end
|
||||
|
||||
defp update_section_info!(data, section_id, fun) do
|
||||
defp update_section_info!({data, _} = data_actions, section_id, fun) do
|
||||
section_infos = Map.update!(data.section_infos, section_id, fun)
|
||||
set!(data, section_infos: section_infos)
|
||||
set!(data_actions, section_infos: section_infos)
|
||||
end
|
||||
|
||||
defp reduce(data, list, reducer) do
|
||||
Enum.reduce(list, data, fn elem, data -> reducer.(data, elem) end)
|
||||
defp reduce(data_actions, list, reducer) do
|
||||
Enum.reduce(list, data_actions, fn elem, data_actions -> reducer.(data_actions, elem) end)
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
|
|
@ -15,7 +15,7 @@ defmodule LiveBook.Session.DataTest do
|
|||
sections: [%{id: "s1"}]
|
||||
},
|
||||
section_infos: %{"s1" => _}
|
||||
}} = Data.apply_operation(data, operation)
|
||||
}, []} = Data.apply_operation(data, operation)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -42,7 +42,7 @@ defmodule LiveBook.Session.DataTest do
|
|||
]
|
||||
},
|
||||
cell_infos: %{"c1" => _}
|
||||
}} = Data.apply_operation(data, operation)
|
||||
}, []} = Data.apply_operation(data, operation)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -53,18 +53,6 @@ defmodule LiveBook.Session.DataTest do
|
|||
assert :error = Data.apply_operation(data, operation)
|
||||
end
|
||||
|
||||
test "returns an error for an evaluating section" do
|
||||
data =
|
||||
data_after_operations!([
|
||||
{:insert_section, 0, "s1"},
|
||||
{:insert_cell, "s1", 0, :elixir, "c1"},
|
||||
{:queue_cell_evaluation, "c1"}
|
||||
])
|
||||
|
||||
operation = {:delete_section, "s1"}
|
||||
assert :error = Data.apply_operation(data, operation)
|
||||
end
|
||||
|
||||
test "removes the section from notebook and session info, adds to deleted sections" do
|
||||
data =
|
||||
data_after_operations!([
|
||||
|
@ -81,7 +69,7 @@ defmodule LiveBook.Session.DataTest do
|
|||
},
|
||||
section_infos: ^empty_map,
|
||||
deleted_sections: [%{id: "s1"}]
|
||||
}} = Data.apply_operation(data, operation)
|
||||
}, []} = Data.apply_operation(data, operation)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -92,16 +80,23 @@ defmodule LiveBook.Session.DataTest do
|
|||
assert :error = Data.apply_operation(data, operation)
|
||||
end
|
||||
|
||||
test "returns an error for an evaluating cell" do
|
||||
test "if the cell is evaluating, cencels section evaluation" do
|
||||
data =
|
||||
data_after_operations!([
|
||||
{:insert_section, 0, "s1"},
|
||||
{:insert_cell, "s1", 0, :elixir, "c1"},
|
||||
{:queue_cell_evaluation, "c1"}
|
||||
{:insert_cell, "s1", 1, :elixir, "c2"},
|
||||
{:queue_cell_evaluation, "c1"},
|
||||
{:queue_cell_evaluation, "c2"}
|
||||
])
|
||||
|
||||
operation = {:delete_cell, "c1"}
|
||||
assert :error = Data.apply_operation(data, operation)
|
||||
|
||||
assert {:ok,
|
||||
%{
|
||||
cell_infos: %{"c2" => %{status: :fresh}},
|
||||
section_infos: %{"s1" => %{evaluating_cell_id: nil, evaluation_queue: []}}
|
||||
}, _actions} = Data.apply_operation(data, operation)
|
||||
end
|
||||
|
||||
test "removes the cell from notebook and session info, adds to deleted cells" do
|
||||
|
@ -121,7 +116,7 @@ defmodule LiveBook.Session.DataTest do
|
|||
},
|
||||
cell_infos: ^empty_map,
|
||||
deleted_cells: [%{id: "c1"}]
|
||||
}} = Data.apply_operation(data, operation)
|
||||
}, _actions} = Data.apply_operation(data, operation)
|
||||
end
|
||||
|
||||
test "unqueues the cell if it's queued for evaluation" do
|
||||
|
@ -139,7 +134,7 @@ defmodule LiveBook.Session.DataTest do
|
|||
assert {:ok,
|
||||
%{
|
||||
section_infos: %{"s1" => %{evaluation_queue: []}}
|
||||
}} = Data.apply_operation(data, operation)
|
||||
}, _actions} = Data.apply_operation(data, operation)
|
||||
end
|
||||
|
||||
test "marks evaluated child cells as stale" do
|
||||
|
@ -160,7 +155,20 @@ defmodule LiveBook.Session.DataTest do
|
|||
assert {:ok,
|
||||
%{
|
||||
cell_infos: %{"c2" => %{status: :stale}}
|
||||
}} = Data.apply_operation(data, operation)
|
||||
}, _actions} = Data.apply_operation(data, operation)
|
||||
end
|
||||
|
||||
test "returns forget evaluation action" do
|
||||
data =
|
||||
data_after_operations!([
|
||||
{:insert_section, 0, "s1"},
|
||||
{:insert_cell, "s1", 0, :elixir, "c1"}
|
||||
])
|
||||
|
||||
operation = {:delete_cell, "c1"}
|
||||
|
||||
assert {:ok, _data, [{:forget_evaluation, %{id: "c1"}, %{id: "s1"}}]} =
|
||||
Data.apply_operation(data, operation)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -207,7 +215,20 @@ defmodule LiveBook.Session.DataTest do
|
|||
%{
|
||||
cell_infos: %{"c1" => %{status: :evaluating}},
|
||||
section_infos: %{"s1" => %{evaluating_cell_id: "c1", evaluation_queue: []}}
|
||||
}} = Data.apply_operation(data, operation)
|
||||
}, _actions} = Data.apply_operation(data, operation)
|
||||
end
|
||||
|
||||
test "returns start evaluation action if the corresponding section is idle" do
|
||||
data =
|
||||
data_after_operations!([
|
||||
{:insert_section, 0, "s1"},
|
||||
{:insert_cell, "s1", 0, :elixir, "c1"}
|
||||
])
|
||||
|
||||
operation = {:queue_cell_evaluation, "c1"}
|
||||
|
||||
assert {:ok, _data, [{:start_evaluation, %{id: "c1"}, %{id: "s1"}}]} =
|
||||
Data.apply_operation(data, operation)
|
||||
end
|
||||
|
||||
test "marks the cell as queued if the corresponding section is already evaluating" do
|
||||
|
@ -225,7 +246,7 @@ defmodule LiveBook.Session.DataTest do
|
|||
%{
|
||||
cell_infos: %{"c2" => %{status: :queued}},
|
||||
section_infos: %{"s1" => %{evaluating_cell_id: "c1", evaluation_queue: ["c2"]}}
|
||||
}} = Data.apply_operation(data, operation)
|
||||
}, []} = Data.apply_operation(data, operation)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -255,7 +276,7 @@ defmodule LiveBook.Session.DataTest do
|
|||
%{
|
||||
cell_infos: %{"c1" => %{status: :evaluated}},
|
||||
section_infos: %{"s1" => %{evaluating_cell_id: nil, evaluation_queue: []}}
|
||||
}} = Data.apply_operation(data, operation)
|
||||
}, []} = Data.apply_operation(data, operation)
|
||||
end
|
||||
|
||||
test "marks next queued cell in this section as evaluating if there is one" do
|
||||
|
@ -274,7 +295,9 @@ defmodule LiveBook.Session.DataTest do
|
|||
%{
|
||||
cell_infos: %{"c2" => %{status: :evaluating}},
|
||||
section_infos: %{"s1" => %{evaluating_cell_id: "c2", evaluation_queue: []}}
|
||||
}} = Data.apply_operation(data, operation)
|
||||
},
|
||||
[{:start_evaluation, %{id: "c2"}, %{id: "s1"}}]} =
|
||||
Data.apply_operation(data, operation)
|
||||
end
|
||||
|
||||
test "if parent cells are not executed, marks them for evaluation first" do
|
||||
|
@ -294,7 +317,9 @@ defmodule LiveBook.Session.DataTest do
|
|||
"c2" => %{status: :queued}
|
||||
},
|
||||
section_infos: %{"s1" => %{evaluating_cell_id: "c1", evaluation_queue: ["c2"]}}
|
||||
}} = Data.apply_operation(data, operation)
|
||||
},
|
||||
[{:start_evaluation, %{id: "c1"}, %{id: "s1"}}]} =
|
||||
Data.apply_operation(data, operation)
|
||||
end
|
||||
|
||||
test "marks evaluated child cells as stale" do
|
||||
|
@ -317,14 +342,110 @@ defmodule LiveBook.Session.DataTest do
|
|||
assert {:ok,
|
||||
%{
|
||||
cell_infos: %{"c2" => %{status: :stale}}
|
||||
}} = Data.apply_operation(data, operation)
|
||||
}, []} = Data.apply_operation(data, operation)
|
||||
end
|
||||
end
|
||||
|
||||
describe "apply_operation/2 given :cancel_cell_evaluation" do
|
||||
test "returns an error given invalid cell id" do
|
||||
data = Data.new()
|
||||
operation = {:cancel_cell_evaluation, "nonexistent"}
|
||||
assert :error = Data.apply_operation(data, operation)
|
||||
end
|
||||
|
||||
test "returns an error for an evaluated cell" do
|
||||
data =
|
||||
data_after_operations!([
|
||||
{:insert_section, 0, "s1"},
|
||||
{:insert_cell, "s1", 0, :elixir, "c1"},
|
||||
{:queue_cell_evaluation, "c1"},
|
||||
{:add_cell_evaluation_response, "c1", {:ok, [1, 2, 3]}}
|
||||
])
|
||||
|
||||
operation = {:cancel_cell_evaluation, "c1"}
|
||||
assert :error = Data.apply_operation(data, operation)
|
||||
end
|
||||
|
||||
test "if the cell is evaluating, clears the corresponding section evaluation and the queue" do
|
||||
data =
|
||||
data_after_operations!([
|
||||
{:insert_section, 0, "s1"},
|
||||
{:insert_cell, "s1", 0, :elixir, "c1"},
|
||||
{:insert_cell, "s1", 1, :elixir, "c2"},
|
||||
{:queue_cell_evaluation, "c1"},
|
||||
{:queue_cell_evaluation, "c2"}
|
||||
])
|
||||
|
||||
operation = {:cancel_cell_evaluation, "c1"}
|
||||
|
||||
assert {:ok,
|
||||
%{
|
||||
cell_infos: %{"c1" => %{status: :fresh}, "c2" => %{status: :fresh}},
|
||||
section_infos: %{"s1" => %{evaluating_cell_id: nil, evaluation_queue: []}}
|
||||
}, _actions} = Data.apply_operation(data, operation)
|
||||
end
|
||||
|
||||
test "if the cell is evaluating, returns stop evaluation action" do
|
||||
data =
|
||||
data_after_operations!([
|
||||
{:insert_section, 0, "s1"},
|
||||
{:insert_cell, "s1", 0, :elixir, "c1"},
|
||||
{:insert_cell, "s1", 1, :elixir, "c2"},
|
||||
{:queue_cell_evaluation, "c1"},
|
||||
{:queue_cell_evaluation, "c2"}
|
||||
])
|
||||
|
||||
operation = {:cancel_cell_evaluation, "c1"}
|
||||
|
||||
assert {:ok, _data, [{:stop_evaluation, %{id: "s1"}}]} =
|
||||
Data.apply_operation(data, operation)
|
||||
end
|
||||
|
||||
test "if the cell is queued, unqueues it" do
|
||||
data =
|
||||
data_after_operations!([
|
||||
{:insert_section, 0, "s1"},
|
||||
{:insert_cell, "s1", 0, :elixir, "c1"},
|
||||
{:insert_cell, "s1", 1, :elixir, "c2"},
|
||||
{:queue_cell_evaluation, "c1"},
|
||||
{:queue_cell_evaluation, "c2"}
|
||||
])
|
||||
|
||||
operation = {:cancel_cell_evaluation, "c2"}
|
||||
|
||||
assert {:ok,
|
||||
%{
|
||||
cell_infos: %{"c2" => %{status: :stale}},
|
||||
section_infos: %{"s1" => %{evaluating_cell_id: "c1", evaluation_queue: []}}
|
||||
}, []} = Data.apply_operation(data, operation)
|
||||
end
|
||||
|
||||
test "if the cell is queued, unqueues dependent cells that are also queued" do
|
||||
data =
|
||||
data_after_operations!([
|
||||
{:insert_section, 0, "s1"},
|
||||
{:insert_cell, "s1", 0, :elixir, "c1"},
|
||||
{:insert_cell, "s1", 1, :elixir, "c2"},
|
||||
{:insert_cell, "s1", 2, :elixir, "c3"},
|
||||
{:queue_cell_evaluation, "c1"},
|
||||
{:queue_cell_evaluation, "c2"},
|
||||
{:queue_cell_evaluation, "c3"}
|
||||
])
|
||||
|
||||
operation = {:cancel_cell_evaluation, "c2"}
|
||||
|
||||
assert {:ok,
|
||||
%{
|
||||
cell_infos: %{"c3" => %{status: :stale}},
|
||||
section_infos: %{"s1" => %{evaluating_cell_id: "c1", evaluation_queue: []}}
|
||||
}, []} = Data.apply_operation(data, operation)
|
||||
end
|
||||
end
|
||||
|
||||
defp data_after_operations!(operations) do
|
||||
Enum.reduce(operations, Data.new(), fn operation, data ->
|
||||
case Data.apply_operation(data, operation) do
|
||||
{:ok, data} ->
|
||||
{:ok, data, _action} ->
|
||||
data
|
||||
|
||||
:error ->
|
||||
|
|
|
@ -72,6 +72,18 @@ defmodule LiveBook.SessionTest do
|
|||
end
|
||||
end
|
||||
|
||||
describe "cancel_cell_evaluation/2" do
|
||||
test "sends a cancel evaluation operation to subscribers", %{session_id: session_id} do
|
||||
Phoenix.PubSub.subscribe(LiveBook.PubSub, "sessions:#{session_id}")
|
||||
|
||||
{_section_id, cell_id} = insert_section_and_cell(session_id)
|
||||
queue_evaluation(session_id, cell_id)
|
||||
|
||||
Session.cancel_cell_evaluation(session_id, cell_id)
|
||||
assert_receive {:operation, {:cancel_cell_evaluation, ^cell_id}}
|
||||
end
|
||||
end
|
||||
|
||||
defp insert_section_and_cell(session_id) do
|
||||
Session.insert_section(session_id, 0)
|
||||
assert_receive {:operation, {:insert_section, 0, section_id}}
|
||||
|
@ -80,4 +92,9 @@ defmodule LiveBook.SessionTest do
|
|||
|
||||
{section_id, cell_id}
|
||||
end
|
||||
|
||||
defp queue_evaluation(session_id, cell_id) do
|
||||
Session.queue_cell_evaluation(session_id, cell_id)
|
||||
assert_receive {:operation, {:add_cell_evaluation_response, ^cell_id, _}}
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue