diff --git a/lib/live_book/application.ex b/lib/live_book/application.ex index a743adf20..89bb142a8 100644 --- a/lib/live_book/application.ex +++ b/lib/live_book/application.ex @@ -13,6 +13,8 @@ defmodule LiveBook.Application do {Phoenix.PubSub, name: LiveBook.PubSub}, # Start the supervisor dynamically managing sessions LiveBook.SessionSupervisor, + # Start the supervisor dynamically spawning evaluator servers + LiveBook.EvaluatorSupervisor, # Start the Endpoint (http/https) LiveBookWeb.Endpoint ] diff --git a/lib/live_book/evaluator.ex b/lib/live_book/evaluator.ex index 08cf4cac5..1375c4a00 100644 --- a/lib/live_book/evaluator.ex +++ b/lib/live_book/evaluator.ex @@ -11,7 +11,7 @@ defmodule LiveBook.Evaluator do # where the evaluation happens, as otherwise we would have to # send them between processes, effectively copying potentially large data. - use GenServer + use GenServer, restart: :temporary alias LiveBook.Evaluator @@ -46,23 +46,19 @@ defmodule LiveBook.Evaluator do end @doc """ - Synchronously parses and evaluates the given code. + Asynchronously parses and evaluates the given code. Any exceptions are captured, in which case this method returns an error. The evaluator stores the resulting binding and environment under `ref`. Any subsequent calls may specify `prev_ref` pointing to a previous evaluation, in which case the corresponding binding and environment are used during evaluation. + + Evaluation response is sent to the process identified by `send_to` as `{:evaluation_response, ref, response}`. """ - @spec evaluate_code(t(), String.t(), ref(), ref()) :: evaluation_response() - def evaluate_code(evaluator, code, ref, prev_ref \\ :initial) when ref != :initial do - response = GenServer.call(evaluator, {:evaluate_code, code, ref, prev_ref}, :infinity) - - if response == :invalid_prev_ref do - raise ArgumentError, message: "invalid reference to previous evaluation: #{prev_ref}" - end - - response + @spec evaluate_code(t(), pid(), String.t(), ref(), ref()) :: :ok + def evaluate_code(evaluator, send_to, code, ref, prev_ref \\ :initial) when ref != :initial do + GenServer.cast(evaluator, {:evaluate_code, send_to, code, ref, prev_ref}) end @doc """ @@ -99,29 +95,26 @@ defmodule LiveBook.Evaluator do end @impl true - def handle_call({:evaluate_code, code, ref, prev_ref}, {from, _}, state) do - case Map.fetch(state.contexts, prev_ref) do - :error -> - {:reply, :invalid_prev_ref, state} + def handle_cast({:evaluate_code, send_to, code, ref, prev_ref}, state) do + Evaluator.IOProxy.configure(state.io_proxy, send_to, ref) - {:ok, context} -> - Evaluator.IOProxy.configure(state.io_proxy, from, ref) + context = Map.get(state.contexts, prev_ref, state.contexts.initial) - case eval(code, context.binding, context.env) do - {:ok, result, binding, env} -> - result_context = %{binding: binding, env: env} - new_contexts = Map.put(state.contexts, ref, result_context) - new_state = %{state | contexts: new_contexts} + case eval(code, context.binding, context.env) do + {:ok, result, binding, env} -> + result_context = %{binding: binding, env: env} + new_contexts = Map.put(state.contexts, ref, result_context) + new_state = %{state | contexts: new_contexts} - {:reply, {:ok, result}, new_state} + send(send_to, {:evaluator_response, ref, {:ok, result}}) + {:noreply, new_state} - {:error, kind, error, stacktrace} -> - {:reply, {:error, kind, error, stacktrace}, state} - end + {:error, kind, error, stacktrace} -> + send(send_to, {:evaluator_response, ref, {:error, kind, error, stacktrace}}) + {:noreply, state} end end - @impl true def handle_cast({:forget_evaluation, ref}, state) do new_state = %{state | contexts: Map.delete(state.contexts, ref)} {:noreply, new_state} diff --git a/lib/live_book/evaluator_supervisor.ex b/lib/live_book/evaluator_supervisor.ex new file mode 100644 index 000000000..ac9ddea7f --- /dev/null +++ b/lib/live_book/evaluator_supervisor.ex @@ -0,0 +1,43 @@ +defmodule LiveBook.EvaluatorSupervisor do + @moduledoc false + + # Supervisor responsible for dynamically spawning + # and terminating terminator server processes. + + use DynamicSupervisor + + alias LiveBook.Evaluator + + @name __MODULE__ + + def start_link(opts \\ []) do + DynamicSupervisor.start_link(__MODULE__, opts, name: @name) + end + + @impl true + def init(_opts) do + DynamicSupervisor.init(strategy: :one_for_one) + end + + @doc """ + Spawns a new evaluator. + """ + @spec start_evaluator() :: {:ok, Evaluator.t()} | {:error, any()} + def start_evaluator() do + case DynamicSupervisor.start_child(@name, Evaluator) do + {:ok, pid} -> {:ok, pid} + {:ok, pid, _} -> {:ok, pid} + :ignore -> {:error, :ignore} + {:error, reason} -> {:error, reason} + end + end + + @doc """ + Terminates the given evaluator. + """ + @spec terminate_evaluator(Evaluator.t()) :: :ok + def terminate_evaluator(evaluator) do + DynamicSupervisor.terminate_child(@name, evaluator) + :ok + end +end diff --git a/lib/live_book/notebook.ex b/lib/live_book/notebook.ex index d7f927339..011f5fbf1 100644 --- a/lib/live_book/notebook.ex +++ b/lib/live_book/notebook.ex @@ -2,7 +2,7 @@ defmodule LiveBook.Notebook do @moduledoc """ Data structure representing a notebook. - A notebook it's just the representation and roughly + A notebook is just the representation and roughly maps to a file that the user can edit. A notebook *session* is a living process that holds a specific notebook instance and allows users to collaboratively apply @@ -13,7 +13,7 @@ defmodule LiveBook.Notebook do defstruct [:name, :version, :sections, :metadata] - alias LiveBook.Notebook.Section + alias LiveBook.Notebook.{Section, Cell} @type t :: %__MODULE__{ name: String.t(), @@ -36,4 +36,135 @@ defmodule LiveBook.Notebook do metadata: %{} } end + + @doc """ + Finds notebook section by id. + """ + @spec fetch_section(t(), Section.id()) :: {:ok, Section.t()} | :error + def fetch_section(notebook, section_id) do + Enum.find_value(notebook.sections, :error, fn section -> + section.id == section_id && {:ok, section} + end) + end + + @doc """ + Finds notebook cell by `id` and the corresponding section. + """ + @spec fetch_cell_and_section(t(), Cell.section_id()) :: {:ok, Cell.t(), Section.t()} | :error + def fetch_cell_and_section(notebook, cell_id) do + for( + section <- notebook.sections, + cell <- section.cells, + cell.id == cell_id, + do: {cell, section} + ) + |> case do + [{cell, section}] -> {:ok, cell, section} + [] -> :error + end + end + + @doc """ + Inserts `section` at the given `index`. + """ + @spec insert_section(t(), integer(), Section.t()) :: t() + def insert_section(notebook, index, section) do + sections = List.insert_at(notebook.sections, index, section) + + %{notebook | sections: sections} + end + + @doc """ + Inserts `cell` at the given `index` within section identified by `section_id`. + """ + @spec insert_cell(t(), Section.id(), integer(), Cell.t()) :: t() + def insert_cell(notebook, section_id, index, cell) do + sections = + Enum.map(notebook.sections, fn section -> + if section.id == section_id do + %{section | cells: List.insert_at(section.cells, index, cell)} + else + section + end + end) + + %{notebook | sections: sections} + end + + @doc """ + Deletes section with the given id. + """ + @spec delete_section(t(), Section.id()) :: t() + def delete_section(notebook, section_id) do + sections = Enum.reject(notebook.sections, &(&1.id == section_id)) + + %{notebook | sections: sections} + end + + @doc """ + Deletes cell with the given id. + """ + @spec delete_cell(t(), Cell.id()) :: t() + def delete_cell(notebook, cell_id) do + sections = + Enum.map(notebook.sections, fn section -> + %{section | cells: Enum.reject(section.cells, &(&1.id == cell_id))} + end) + + %{notebook | sections: sections} + end + + @doc """ + Updates cell with the given function. + """ + @spec update_cell(t(), Cell.id(), (Cell.t() -> Cell.t())) :: t() + def update_cell(notebook, cell_id, fun) do + sections = + Enum.map(notebook.sections, fn section -> + cells = + Enum.map(section.cells, fn cell -> + if cell.id == cell_id, do: fun.(cell), else: cell + end) + + %{section | cells: cells} + end) + + %{notebook | sections: sections} + end + + @doc """ + Returns a list of Elixir cells that the given cell depends on. + + The cells are ordered starting from the most direct parent. + """ + @spec parent_cells(t(), Cell.id()) :: list(Cell.t()) + def parent_cells(notebook, cell_id) do + with {:ok, _, section} <- LiveBook.Notebook.fetch_cell_and_section(notebook, cell_id) do + # A cell depends on all previous cells within the same section. + section.cells + |> Enum.filter(&(&1.type == :elixir)) + |> Enum.take_while(&(&1.id != cell_id)) + |> Enum.reverse() + else + _ -> [] + end + end + + @doc """ + Returns a list of Elixir cells that depend on the given cell. + + The cells are ordered starting from the most direct child. + """ + @spec child_cells(t(), Cell.id()) :: list(Cell.t()) + def child_cells(notebook, cell_id) do + with {:ok, _, section} <- LiveBook.Notebook.fetch_cell_and_section(notebook, cell_id) do + # A cell affects all the cells below it within the same section. + section.cells + |> Enum.filter(&(&1.type == :elixir)) + |> Enum.reverse() + |> Enum.take_while(&(&1.id != cell_id)) + else + _ -> [] + end + end end diff --git a/lib/live_book/notebook/cell.ex b/lib/live_book/notebook/cell.ex index 0c7b674c1..8f3f84f32 100644 --- a/lib/live_book/notebook/cell.ex +++ b/lib/live_book/notebook/cell.ex @@ -11,12 +11,12 @@ defmodule LiveBook.Notebook.Cell do alias LiveBook.Utils - @type cell_id :: Utils.id() - @type cell_type :: :markdown | :elixir + @type id :: Utils.id() + @type type :: :markdown | :elixir @type t :: %__MODULE__{ - id: cell_id(), - type: cell_type(), + id: id(), + type: type(), source: String.t(), # TODO: expand on this outputs: list(), @@ -26,7 +26,7 @@ defmodule LiveBook.Notebook.Cell do @doc """ Returns an empty cell of the given type. """ - @spec new(cell_type()) :: t() + @spec new(type()) :: t() def new(type) do %__MODULE__{ id: Utils.random_id(), diff --git a/lib/live_book/notebook/section.ex b/lib/live_book/notebook/section.ex index 7fe788b15..ae7bf0b2e 100644 --- a/lib/live_book/notebook/section.ex +++ b/lib/live_book/notebook/section.ex @@ -11,10 +11,10 @@ defmodule LiveBook.Notebook.Section do alias LiveBook.Notebook.Cell alias LiveBook.Utils - @type section_id :: Utils.id() + @type id :: Utils.id() @type t :: %__MODULE__{ - id: section_id(), + id: id(), name: String.t(), cells: list(Cell.t()), metadata: %{atom() => term()} diff --git a/lib/live_book/session.ex b/lib/live_book/session.ex index a8c648a2c..14c0a1972 100644 --- a/lib/live_book/session.ex +++ b/lib/live_book/session.ex @@ -7,13 +7,28 @@ defmodule LiveBook.Session do # as a source of truth that multiple clients talk to. # Receives update requests from the clients and notifies # them of any changes applied to the notebook. + # + # The core concept is the `Data` structure + # to which we can apply reproducible opreations. + # See `Data` for more information. use GenServer, restart: :temporary + alias LiveBook.Session.Data + alias LiveBook.{Evaluator, EvaluatorSupervisor, Utils, Notebook} + alias LiveBook.Notebook.{Cell, Section} + + @type state :: %{ + session_id: id(), + data: Data.t(), + evaluators: %{Section.t() => Evaluator.t()}, + client_pids: list(pid()) + } + @typedoc """ An id assigned to every running session process. """ - @type session_id :: LiveBook.Utils.id() + @type id :: Utils.id() ## API @@ -21,7 +36,7 @@ defmodule LiveBook.Session do Starts the server process and registers it globally using the `:global` module, so that it's identifiable by the given id. """ - @spec start_link(session_id()) :: GenServer.on_start() + @spec start_link(id()) :: GenServer.on_start() def start_link(session_id) do GenServer.start_link(__MODULE__, [session_id: session_id], name: name(session_id)) end @@ -30,10 +45,61 @@ defmodule LiveBook.Session do {:global, {:session, session_id}} end + @doc """ + Registers a session client, so that it receives updates from the server. + + The client process is automatically unregistered when it terminates. + """ + @spec register_client(id(), pid()) :: :ok + def register_client(session_id, pid) do + GenServer.cast(name(session_id), {:register_client, pid}) + end + + @doc """ + Asynchronously sends section insertion request to the server. + """ + @spec insert_section(id(), non_neg_integer()) :: :ok + def insert_section(session_id, index) do + GenServer.cast(name(session_id), {:insert_section, index}) + end + + @doc """ + Asynchronously sends cell insertion request to the server. + """ + @spec insert_cell(id(), Section.id(), non_neg_integer(), Cell.type()) :: + :ok + def insert_cell(session_id, section_id, index, type) do + GenServer.cast(name(session_id), {:insert_cell, section_id, index, type}) + end + + @doc """ + Asynchronously sends section deletion request to the server. + """ + @spec delete_section(id(), Section.id()) :: :ok + def delete_section(session_id, section_id) do + GenServer.cast(name(session_id), {:delete_section, section_id}) + end + + @doc """ + Asynchronously sends cell deletion request to the server. + """ + @spec delete_cell(id(), Cell.id()) :: :ok + def delete_cell(session_id, cell_id) do + GenServer.cast(name(session_id), {:delete_cell, cell_id}) + end + + @doc """ + Asynchronously sends cell evaluation request to the server. + """ + @spec queue_cell_evaluation(id(), Cell.id()) :: :ok + def queue_cell_evaluation(session_id, cell_id) do + GenServer.cast(name(session_id), {:queue_cell_evaluation, cell_id}) + end + @doc """ Synchronously stops the server. """ - @spec stop(session_id()) :: :ok + @spec stop(id()) :: :ok def stop(session_id) do GenServer.stop(name(session_id)) end @@ -41,7 +107,163 @@ defmodule LiveBook.Session do ## Callbacks @impl true - def init(session_id: _id) do - {:ok, %{}} + def init(session_id: session_id) do + {:ok, + %{ + session_id: session_id, + data: Data.new(), + evaluators: %{}, + client_pids: [] + }} + end + + @impl true + def handle_cast({:register_client, pid}, state) do + Process.monitor(pid) + {:noreply, %{state | client_pids: [pid | state.client_pids]}} + end + + def handle_cast({:insert_section, index}, state) do + # Include new id in the operation, so it's reproducible + operation = {:insert_section, index, Utils.random_id()} + handle_operation(state, operation) + end + + def handle_cast({:insert_cell, section_id, index, type}, state) do + # Include new id in the operation, so it's reproducible + operation = {:insert_cell, section_id, index, type, Utils.random_id()} + handle_operation(state, operation) + end + + def handle_cast({:delete_section, section_id}, state) do + operation = {:delete_section, section_id} + + handle_operation(state, operation, fn new_state -> + delete_section_evaluator(new_state, section_id) + end) + end + + def handle_cast({:delete_cell, cell_id}, state) do + operation = {:delete_cell, cell_id} + handle_operation(state, operation) + end + + def handle_cast({:queue_cell_evaluation, cell_id}, state) do + operation = {:queue_cell_evaluation, cell_id} + + handle_operation(state, operation, fn new_state -> + maybe_trigger_evaluations(state, new_state) + end) + end + + @impl true + def handle_info({:DOWN, _, :process, pid, _}, state) do + {:noreply, %{state | client_pids: List.delete(state.client_pids, pid)}} + end + + def handle_info({:evaluator_stdout, cell_id, string}, state) do + operation = {:add_cell_evaluation_stdout, cell_id, string} + handle_operation(state, operation) + end + + def handle_info({:evaluator_response, cell_id, response}, state) do + operation = {:add_cell_evaluation_response, cell_id, response} + + handle_operation(state, operation, fn new_state -> + maybe_trigger_evaluations(state, new_state) + end) + end + + # --- + + # Given any opeation on `Data`, the process does the following: + # + # * broadcasts the operation to all clients immediately, + # so that they can update their local `Data` + # * applies the operation to own local `Data` + # * optionally performs a relevant task (e.g. starts cell evaluation), + # to reflect the new `Data` + # + defp handle_operation(state, operation) do + handle_operation(state, operation, fn state -> state end) + end + + defp handle_operation(state, operation, handle_new_state) do + broadcast_operation(state.session_id, operation) + + case Data.apply_operation(state.data, operation) do + {:ok, new_data} -> + new_state = %{state | data: new_data} + {:noreply, handle_new_state.(new_state)} + + :error -> + {:noreply, state} + end + end + + defp broadcast_operation(session_id, operation) do + message = {:operation, operation} + Phoenix.PubSub.broadcast(LiveBook.PubSub, "sessions:#{session_id}", message) + end + + # Compares sections in the old and new state and if a new cell + # has been marked as evaluating it triggers the actual evaluation task. + defp maybe_trigger_evaluations(old_state, new_state) do + Enum.reduce(new_state.data.notebook.sections, new_state, fn section, state -> + case {Data.get_evaluating_cell_id(old_state.data, section.id), + Data.get_evaluating_cell_id(new_state.data, section.id)} do + {_, nil} -> + # No cell to evaluate + state + + {cell_id, cell_id} -> + # The evaluating cell hasn't changed, so it must be already evaluating + state + + {_, cell_id} -> + # The evaluating cell changed, so we trigger the evaluation to reflect that + trigger_evaluation(state, cell_id) + end + end) + end + + defp trigger_evaluation(state, cell_id) do + notebook = state.data.notebook + {:ok, cell, section} = Notebook.fetch_cell_and_section(notebook, cell_id) + {state, evaluator} = get_section_evaluator(state, section.id) + %{source: source} = cell + + prev_ref = + case Notebook.parent_cells(notebook, cell_id) do + [parent | _] -> parent.id + [] -> :initial + end + + Evaluator.evaluate_code(evaluator, self(), source, cell_id, prev_ref) + + state + end + + defp get_section_evaluator(state, section_id) do + case Map.fetch(state.evaluators, section_id) do + {:ok, evaluator} -> + {state, evaluator} + + :error -> + {:ok, evaluator} = EvaluatorSupervisor.start_evaluator() + state = %{state | evaluators: Map.put(state.evaluators, section_id, evaluator)} + {state, evaluator} + end + end + + defp delete_section_evaluator(state, section_id) do + case Map.fetch(state.evaluators, section_id) do + {:ok, evaluator} -> + EvaluatorSupervisor.terminate_evaluator(evaluator) + %{state | evaluators: Map.delete(state.evaluators, section_id)} + + :error -> + state + end end end diff --git a/lib/live_book/session/data.ex b/lib/live_book/session/data.ex new file mode 100644 index 000000000..d033b737b --- /dev/null +++ b/lib/live_book/session/data.ex @@ -0,0 +1,361 @@ +defmodule LiveBook.Session.Data do + @moduledoc false + + # A structure with shared session data. + # + # In some sense this structure is a `Notebook` decorated + # with all the emphemeral session data. + # + # The data is kept both in the `Session` process and all the client processes. + # All changes go to the `Session` process first to introduce linearity + # and then are broadcasted to the clients, hence every client + # receives changes in the same order. + # Upon receiving a change message, every process applies + # the change to the locally stored `Data`. In this way the local `Data` + # stays the same in all processes, while the messages are minimal. + + defstruct [ + :notebook, + :path, + :section_infos, + :cell_infos, + :deleted_sections, + :deleted_cells + ] + + alias LiveBook.{Notebook, Evaluator} + alias LiveBook.Notebook.{Cell, Section} + + @type t :: %__MODULE__{ + notebook: Notebook.t(), + path: nil | String.t(), + section_infos: %{Section.id() => section_info()}, + cell_infos: %{Cell.id() => cell_info()}, + deleted_sections: list(Section.t()), + deleted_cells: list(Cell.t()) + } + + @type section_info :: %{ + evaluating_cell_id: Cell.id(), + evaluation_queue: list(Cell.id()) + } + + @type cell_info :: %{ + status: cell_status(), + revision: non_neg_integer(), + # TODO: specify it's a list of deltas, once defined + deltas: list(), + evaluated_at: DateTime.t() + } + + @type cell_status :: :fresh | :queued | :evaluating | :evaluated | :stale + + @type index :: non_neg_integer() + + @type operation :: + {:insert_section, index(), Section.id()} + | {:insert_cell, Section.id(), index(), Cell.type(), Cell.id()} + | {:delete_section, Section.id()} + | {:delete_cell, Cell.id()} + | {:queue_cell_evaluation, Cell.id()} + | {:add_cell_evaluation_stdout, Cell.id(), String.t()} + | {:add_cell_evaluation_response, Cell.id(), Evaluator.evaluation_response()} + + @doc """ + Returns a fresh notebook session state. + """ + @spec new() :: t() + def new() do + %__MODULE__{ + notebook: Notebook.new(), + path: nil, + section_infos: %{}, + cell_infos: %{}, + deleted_sections: [], + deleted_cells: [] + } + end + + @doc """ + Applies the change specified by `operation` to the given session `data`. + + All operations are reproducible (i.e. this function is pure), + so provided all processes have the same session data + they can individually apply any given operation and end up in the same state. + + An operation only applies changes to the structure, but it doesn't trigger + any actual processing. It's the responsibility of the session process to ensure + the system reflects the new structure. For instance, when a new cell is marked + as evaluating, the session process should take care of triggering actual evaluation. + + Returns `{:ok, data}` on correct application or `:error` if the operation + is not valid. The `:error` is generally expected given the collaborative + nature of sessions. For example if there are simultaneous deletion + and evaluation operations on the same cell, we may perform delete first, + in which case the evaluation is no longer valid (there's no cell with the given id). + By returning `:error` we simply notify the caller that no changes were applied, + so any related actions can be ignored. + """ + @spec apply_operation(t(), operation()) :: {:ok, t()} | :error + def apply_operation(data, operation) + + def apply_operation(data, {:insert_section, index, id}) do + section = %{Section.new() | id: id} + + data + |> insert_section(index, section) + |> wrap_ok() + end + + def apply_operation(data, {:insert_cell, section_id, index, type, id}) do + with {:ok, _section} <- Notebook.fetch_section(data.notebook, section_id) do + cell = %{Cell.new(type) | id: id} + + data + |> insert_cell(section_id, index, cell) + |> wrap_ok() + end + end + + def apply_operation(data, {:delete_section, id}) do + with {:ok, section} <- Notebook.fetch_section(data.notebook, id), + # If a cell within this section is being evaluated, it should be cancelled first + nil <- data.section_infos[section.id].evaluating_cell_id do + data + |> delete_section(section) + |> wrap_ok() + else + _ -> :error + end + end + + def apply_operation(data, {:delete_cell, id}) do + with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id), + # If the cell is being evaluated, it should be cancelled first + false <- data.cell_infos[cell.id].status == :evaluating do + data + |> unqueue_cell_evaluation_if_any(cell, section) + |> mark_dependent_cells_as_stale(cell) + |> delete_cell(cell) + |> wrap_ok() + else + _ -> :error + end + end + + def apply_operation(data, {:queue_cell_evaluation, id}) do + with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id), + :elixir <- cell.type, + false <- data.cell_infos[cell.id].status in [:queued, :evaluating] do + prerequisites_queue = fresh_parent_cells_queue(data, cell) + + data + |> reduce(prerequisites_queue, &queue_cell_evaluation(&1, &2, section)) + |> queue_cell_evaluation(cell, section) + |> maybe_evaluate_queued() + |> wrap_ok() + else + _ -> :error + end + end + + def apply_operation(data, {:add_cell_evaluation_stdout, id, string}) do + with {:ok, cell, _} <- Notebook.fetch_cell_and_section(data.notebook, id) do + data + |> add_cell_evaluation_stdout(cell, string) + |> wrap_ok() + end + end + + def apply_operation(data, {:add_cell_evaluation_response, id, response}) do + with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id) do + data + |> add_cell_evaluation_response(cell, response) + |> finish_cell_evaluation(cell, section) + |> mark_dependent_cells_as_stale(cell) + |> maybe_evaluate_queued() + |> wrap_ok() + end + end + + # === + + defp insert_section(data, index, section) do + data + |> set!( + notebook: Notebook.insert_section(data.notebook, index, section), + section_infos: Map.put(data.section_infos, section.id, new_section_info()) + ) + end + + defp insert_cell(data, section_id, index, cell) do + data + |> set!( + notebook: Notebook.insert_cell(data.notebook, section_id, index, cell), + cell_infos: Map.put(data.cell_infos, cell.id, new_cell_info()) + ) + end + + defp delete_section(data, section) do + data + |> set!( + notebook: Notebook.delete_section(data.notebook, section.id), + section_infos: Map.delete(data.section_infos, section.id), + deleted_sections: [section | data.deleted_sections] + ) + |> reduce(section.cells, &delete_cell_info/2) + end + + defp delete_cell(data, cell) do + data + |> set!( + notebook: Notebook.delete_cell(data.notebook, cell.id), + deleted_cells: [cell | data.deleted_cells] + ) + |> delete_cell_info(cell) + end + + defp delete_cell_info(data, cell) do + data + |> set!(cell_infos: Map.delete(data.cell_infos, cell.id)) + end + + defp queue_cell_evaluation(data, cell, section) do + data + |> update_section_info!(section.id, fn section -> + %{section | evaluation_queue: section.evaluation_queue ++ [cell.id]} + end) + |> set_cell_info!(cell.id, status: :queued) + end + + defp unqueue_cell_evaluation_if_any(data, cell, section) do + data + |> update_section_info!(section.id, fn section -> + %{section | evaluation_queue: List.delete(section.evaluation_queue, cell.id)} + end) + |> set_cell_info!(cell.id, status: :stale) + end + + defp add_cell_evaluation_stdout(data, _cell, _string) do + data + |> set!( + # TODO: add stdout to cell outputs + notebook: data.notebook + ) + end + + defp add_cell_evaluation_response(data, _cell, _response) do + data + |> set!( + # TODO: add result to outputs + notebook: data.notebook + ) + end + + defp finish_cell_evaluation(data, cell, section) do + data + |> set_cell_info!(cell.id, status: :evaluated, evaluated_at: DateTime.utc_now()) + |> set_section_info!(section.id, evaluating_cell_id: nil) + end + + defp mark_dependent_cells_as_stale(data, cell) do + invalidated_cells = evaluated_child_cells(data, cell) + + data + |> reduce(invalidated_cells, &set_cell_info!(&1, &2.id, status: :stale)) + end + + defp fresh_parent_cells_queue(data, cell) do + data.notebook + |> Notebook.parent_cells(cell.id) + |> Enum.filter(fn parent -> data.cell_infos[parent.id].status == :fresh end) + |> Enum.reverse() + end + + defp evaluated_child_cells(data, cell) do + data.notebook + |> Notebook.child_cells(cell.id) + # Mark only evaluted cells as stale + |> Enum.filter(fn cell -> data.cell_infos[cell.id].status == :evaluated end) + end + + # If there are idle sections with non-empty evaluation queue, + # the next queued cell for evaluation. + defp maybe_evaluate_queued(data) do + Enum.reduce(data.notebook.sections, data, fn section, data -> + case data.section_infos[section.id] do + %{evaluating_cell_id: nil, evaluation_queue: [id | ids]} -> + data + |> set!(notebook: Notebook.update_cell(data.notebook, id, &%{&1 | outputs: []})) + |> set_cell_info!(id, status: :evaluating) + |> set_section_info!(section.id, evaluating_cell_id: id, evaluation_queue: ids) + + _ -> + data + end + end) + end + + defp wrap_ok(value), do: {:ok, value} + + defp new_section_info() do + %{ + evaluating_cell_id: nil, + evaluation_queue: [] + } + end + + defp new_cell_info() do + %{ + revision: 0, + deltas: [], + status: :fresh, + evaluated_at: nil + } + end + + defp set!(data, changes) do + Enum.reduce(changes, data, fn {key, value}, info -> + Map.replace!(info, key, value) + end) + end + + defp set_cell_info!(data, cell_id, changes) do + update_cell_info!(data, cell_id, fn info -> + Enum.reduce(changes, info, fn {key, value}, info -> + Map.replace!(info, key, value) + end) + end) + end + + defp update_cell_info!(data, cell_id, fun) do + cell_infos = Map.update!(data.cell_infos, cell_id, fun) + set!(data, cell_infos: cell_infos) + end + + defp set_section_info!(data, section_id, changes) do + update_section_info!(data, section_id, fn info -> + Enum.reduce(changes, info, fn {key, value}, info -> + Map.replace!(info, key, value) + end) + end) + end + + defp update_section_info!(data, section_id, fun) do + section_infos = Map.update!(data.section_infos, section_id, fun) + set!(data, section_infos: section_infos) + end + + defp reduce(data, list, reducer) do + Enum.reduce(list, data, fn elem, data -> reducer.(data, elem) end) + end + + @doc """ + Finds the cell that's currently being evaluated in the given section. + """ + @spec get_evaluating_cell_id(t(), Section.id()) :: Cell.id() | nil + def get_evaluating_cell_id(data, section_id) do + info = data.section_infos[section_id] + info && info.evaluating_cell_id + end +end diff --git a/lib/live_book/session_supervisor.ex b/lib/live_book/session_supervisor.ex index f9b5b9e3f..accb8fcc6 100644 --- a/lib/live_book/session_supervisor.ex +++ b/lib/live_book/session_supervisor.ex @@ -26,7 +26,7 @@ defmodule LiveBook.SessionSupervisor do Broadcasts `{:session_created, id}` message under the `"sessions"` topic. """ - @spec create_session() :: {:ok, Session.session_id()} | {:error, any()} + @spec create_session() :: {:ok, Section.id()} | {:error, any()} def create_session() do id = Utils.random_id() @@ -52,7 +52,7 @@ defmodule LiveBook.SessionSupervisor do Broadcasts `{:session_delete, id}` message under the `"sessions"` topic. """ - @spec delete_session(Session.session_id()) :: :ok + @spec delete_session(Section.id()) :: :ok def delete_session(id) do Session.stop(id) broadcast_sessions_message({:session_deleted, id}) @@ -66,7 +66,7 @@ defmodule LiveBook.SessionSupervisor do @doc """ Returns ids of all the running session processes. """ - @spec get_session_ids() :: list(Session.session_id()) + @spec get_session_ids() :: list(Section.id()) def get_session_ids() do :global.registered_names() |> Enum.flat_map(fn @@ -78,7 +78,7 @@ defmodule LiveBook.SessionSupervisor do @doc """ Checks if a session process with the given id exists. """ - @spec session_exists?(Session.session_id()) :: boolean() + @spec session_exists?(Section.id()) :: boolean() def session_exists?(id) do :global.whereis_name({:session, id}) != :undefined end @@ -86,7 +86,7 @@ defmodule LiveBook.SessionSupervisor do @doc """ Retrieves pid of a session process identified by the given id. """ - @spec get_session_pid(Session.session_id()) :: {:ok, pid()} | {:error, :nonexistent} + @spec get_session_pid(Section.id()) :: {:ok, pid()} | {:error, :nonexistent} def get_session_pid(id) do case :global.whereis_name({:session, id}) do :undefined -> {:error, :nonexistent} diff --git a/test/live_book/evaluator_test.exs b/test/live_book/evaluator_test.exs index bcd4ad712..dc5dc5f80 100644 --- a/test/live_book/evaluator_test.exs +++ b/test/live_book/evaluator_test.exs @@ -16,44 +16,47 @@ defmodule LiveBook.EvaluatorTest do x + y """ - result = Evaluator.evaluate_code(evaluator, code, 1) + Evaluator.evaluate_code(evaluator, self(), code, :code_1) - assert result == {:ok, 3} + assert_receive {:evaluator_response, :code_1, {:ok, 3}} end test "given no prev_ref does not see previous evaluation context", %{evaluator: evaluator} do - Evaluator.evaluate_code(evaluator, "x = 1", :code_1) + Evaluator.evaluate_code(evaluator, self(), "x = 1", :code_1) + assert_receive {:evaluator_response, :code_1, _} - result = Evaluator.evaluate_code(evaluator, "x", :code_2) + Evaluator.evaluate_code(evaluator, self(), "x", :code_2) - assert {:error, _kind, %CompileError{description: "undefined function x/0"}, _stacktrace} = - result + assert_receive {:evaluator_response, :code_2, + {:error, _kind, %CompileError{description: "undefined function x/0"}, + _stacktrace}} end test "given prev_ref sees previous evaluation context", %{evaluator: evaluator} do - Evaluator.evaluate_code(evaluator, "x = 1", :code_1) + Evaluator.evaluate_code(evaluator, self(), "x = 1", :code_1) + assert_receive {:evaluator_response, :code_1, _} - result = Evaluator.evaluate_code(evaluator, "x", :code_2, :code_1) + Evaluator.evaluate_code(evaluator, self(), "x", :code_2, :code_1) - assert result == {:ok, 1} + assert_receive {:evaluator_response, :code_2, {:ok, 1}} end - test "given invalid prev_ref raises an error", %{evaluator: evaluator} do - assert_raise ArgumentError, fn -> - Evaluator.evaluate_code(evaluator, ":ok", :code_1, :code_nonexistent) - end + test "given invalid prev_ref just uses default context", %{evaluator: evaluator} do + Evaluator.evaluate_code(evaluator, self(), ":hey", :code_1, :code_nonexistent) + + assert_receive {:evaluator_response, :code_1, {:ok, :hey}} end test "captures standard output and sends it to the caller", %{evaluator: evaluator} do - Evaluator.evaluate_code(evaluator, ~s{IO.puts("hey")}, :code_1) + Evaluator.evaluate_code(evaluator, self(), ~s{IO.puts("hey")}, :code_1) - assert_received {:evaluator_stdout, :code_1, "hey\n"} + assert_receive {:evaluator_stdout, :code_1, "hey\n"} end test "using standard input results in an immediate error", %{evaluator: evaluator} do - result = Evaluator.evaluate_code(evaluator, ~s{IO.gets("> ")}, :code_1) + Evaluator.evaluate_code(evaluator, self(), ~s{IO.gets("> ")}, :code_1) - assert result == {:ok, {:error, :enotsup}} + assert_receive {:evaluator_response, :code_1, {:ok, {:error, :enotsup}}} end test "returns error along with its kind and stacktrace", %{evaluator: evaluator} do @@ -61,9 +64,10 @@ defmodule LiveBook.EvaluatorTest do List.first(%{}) """ - result = Evaluator.evaluate_code(evaluator, code, :code_1) + Evaluator.evaluate_code(evaluator, self(), code, :code_1) - assert {:error, :error, %FunctionClauseError{}, [{List, :first, 1, _location}]} = result + assert_receive {:evaluator_response, :code_1, + {:error, :error, %FunctionClauseError{}, [{List, :first, 1, _location}]}} end test "in case of an error returns only the relevant part of stacktrace", %{ @@ -87,25 +91,28 @@ defmodule LiveBook.EvaluatorTest do Cat.meow() """ - result = Evaluator.evaluate_code(evaluator, code, :code_1) + Evaluator.evaluate_code(evaluator, self(), code, :code_1) expected_stacktrace = [ {Math, :bad_math, 0, [file: 'nofile', line: 3]}, {Cat, :meow, 0, [file: 'nofile', line: 10]} ] - assert {:error, _kind, _error, ^expected_stacktrace} = result + assert_receive {:evaluator_response, :code_1, {:error, _kind, _error, ^expected_stacktrace}} end end describe "forget_evaluation/2" do test "invalidates the given reference", %{evaluator: evaluator} do - Evaluator.evaluate_code(evaluator, "x = 1", :code_1) - Evaluator.forget_evaluation(evaluator, :code_1) + Evaluator.evaluate_code(evaluator, self(), "x = 1", :code_1) + assert_receive {:evaluator_response, :code_1, _} - assert_raise ArgumentError, fn -> - Evaluator.evaluate_code(evaluator, ":ok", :code_2, :code_1) - end + Evaluator.forget_evaluation(evaluator, :code_1) + Evaluator.evaluate_code(evaluator, self(), "x", :code_2, :code_1) + + assert_receive {:evaluator_response, :code_2, + {:error, _kind, %CompileError{description: "undefined function x/0"}, + _stacktrace}} end end end diff --git a/test/live_book/session/data_test.exs b/test/live_book/session/data_test.exs new file mode 100644 index 000000000..0e6693d49 --- /dev/null +++ b/test/live_book/session/data_test.exs @@ -0,0 +1,335 @@ +defmodule LiveBook.Session.DataTest do + use ExUnit.Case, async: true + + alias LiveBook.Session.Data + + describe "apply_operation/2 given :insert_section" do + test "adds new section to notebook and session info" do + data = Data.new() + + operation = {:insert_section, 0, "s1"} + + assert {:ok, + %{ + notebook: %{ + sections: [%{id: "s1"}] + }, + section_infos: %{"s1" => _} + }} = Data.apply_operation(data, operation) + end + end + + describe "apply_operation/2 given :insert_cell" do + test "returns an error given invalid section id" do + data = Data.new() + operation = {:insert_cell, "nonexistent", 0, :elixir, "c1"} + assert :error = Data.apply_operation(data, operation) + end + + test "insert_cell adds new cell to notebook and session info" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"} + ]) + + operation = {:insert_cell, "s1", 0, :elixir, "c1"} + + assert {:ok, + %{ + notebook: %{ + sections: [ + %{cells: [%{id: "c1"}]} + ] + }, + cell_infos: %{"c1" => _} + }} = Data.apply_operation(data, operation) + end + end + + describe "apply_operation/2 given :delete_section" do + test "returns an error given invalid section id" do + data = Data.new() + operation = {:delete_section, "nonexistent"} + assert :error = Data.apply_operation(data, operation) + end + + test "returns an error for an evaluating section" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:queue_cell_evaluation, "c1"} + ]) + + operation = {:delete_section, "s1"} + assert :error = Data.apply_operation(data, operation) + end + + test "removes the section from notebook and session info, adds to deleted sections" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"} + ]) + + operation = {:delete_section, "s1"} + empty_map = %{} + + assert {:ok, + %{ + notebook: %{ + sections: [] + }, + section_infos: ^empty_map, + deleted_sections: [%{id: "s1"}] + }} = Data.apply_operation(data, operation) + end + end + + describe "apply_operation/2 given :delete_cell" do + test "returns an error given invalid cell id" do + data = Data.new() + operation = {:delete_cell, "nonexistent"} + assert :error = Data.apply_operation(data, operation) + end + + test "returns an error for an evaluating cell" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:queue_cell_evaluation, "c1"} + ]) + + operation = {:delete_cell, "c1"} + assert :error = Data.apply_operation(data, operation) + end + + test "removes the cell from notebook and session info, adds to deleted cells" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"} + ]) + + operation = {:delete_cell, "c1"} + empty_map = %{} + + assert {:ok, + %{ + notebook: %{ + sections: [%{cells: []}] + }, + cell_infos: ^empty_map, + deleted_cells: [%{id: "c1"}] + }} = Data.apply_operation(data, operation) + end + + test "unqueues the cell if it's queued for evaluation" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:insert_cell, "s1", 1, :elixir, "c2"}, + {:queue_cell_evaluation, "c1"}, + {:queue_cell_evaluation, "c2"} + ]) + + operation = {:delete_cell, "c2"} + + assert {:ok, + %{ + section_infos: %{"s1" => %{evaluation_queue: []}} + }} = Data.apply_operation(data, operation) + end + + test "marks evaluated child cells as stale" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:insert_cell, "s1", 1, :elixir, "c2"}, + # Evaluate both cells + {:queue_cell_evaluation, "c1"}, + {:add_cell_evaluation_response, "c1", {:ok, [1, 2, 3]}}, + {:queue_cell_evaluation, "c2"}, + {:add_cell_evaluation_response, "c2", {:ok, [1, 2, 3]}} + ]) + + operation = {:delete_cell, "c1"} + + assert {:ok, + %{ + cell_infos: %{"c2" => %{status: :stale}} + }} = Data.apply_operation(data, operation) + end + end + + describe "apply_operation/2 given :queue_cell_evaluation" do + test "returns an error given invalid cell id" do + data = Data.new() + operation = {:queue_cell_evaluation, "nonexistent"} + assert :error = Data.apply_operation(data, operation) + end + + test "returns an error given non-elixir cell" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :markdown, "c1"} + ]) + + operation = {:queue_cell_evaluation, "c1"} + assert :error = Data.apply_operation(data, operation) + end + + test "returns an error for an evaluating cell" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:queue_cell_evaluation, "c1"} + ]) + + operation = {:queue_cell_evaluation, "c1"} + assert :error = Data.apply_operation(data, operation) + end + + test "marks the cell as evaluating if the corresponding section is idle" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"} + ]) + + operation = {:queue_cell_evaluation, "c1"} + + assert {:ok, + %{ + cell_infos: %{"c1" => %{status: :evaluating}}, + section_infos: %{"s1" => %{evaluating_cell_id: "c1", evaluation_queue: []}} + }} = Data.apply_operation(data, operation) + end + + test "marks the cell as queued if the corresponding section is already evaluating" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:insert_cell, "s1", 1, :elixir, "c2"}, + {:queue_cell_evaluation, "c1"} + ]) + + operation = {:queue_cell_evaluation, "c2"} + + assert {:ok, + %{ + cell_infos: %{"c2" => %{status: :queued}}, + section_infos: %{"s1" => %{evaluating_cell_id: "c1", evaluation_queue: ["c2"]}} + }} = Data.apply_operation(data, operation) + end + end + + describe "apply_operation/2 given :add_cell_evaluation_stdout" do + test "update the cell output" do + # TODO assert against output being updated once we do so + end + end + + describe "apply_operation/2 given :add_cell_evaluation_response" do + test "update the cell output" do + # TODO assert against output being updated once we do so + end + + test "marks the cell as evaluated" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:insert_cell, "s1", 1, :elixir, "c2"}, + {:queue_cell_evaluation, "c1"} + ]) + + operation = {:add_cell_evaluation_response, "c1", {:ok, [1, 2, 3]}} + + assert {:ok, + %{ + cell_infos: %{"c1" => %{status: :evaluated}}, + section_infos: %{"s1" => %{evaluating_cell_id: nil, evaluation_queue: []}} + }} = Data.apply_operation(data, operation) + end + + test "marks next queued cell in this section as evaluating if there is one" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:insert_cell, "s1", 1, :elixir, "c2"}, + {:queue_cell_evaluation, "c1"}, + {:queue_cell_evaluation, "c2"} + ]) + + operation = {:add_cell_evaluation_response, "c1", {:ok, [1, 2, 3]}} + + assert {:ok, + %{ + cell_infos: %{"c2" => %{status: :evaluating}}, + section_infos: %{"s1" => %{evaluating_cell_id: "c2", evaluation_queue: []}} + }} = Data.apply_operation(data, operation) + end + + test "if parent cells are not executed, marks them for evaluation first" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:insert_cell, "s1", 1, :elixir, "c2"} + ]) + + operation = {:queue_cell_evaluation, "c2"} + + assert {:ok, + %{ + cell_infos: %{ + "c1" => %{status: :evaluating}, + "c2" => %{status: :queued} + }, + section_infos: %{"s1" => %{evaluating_cell_id: "c1", evaluation_queue: ["c2"]}} + }} = Data.apply_operation(data, operation) + end + + test "marks evaluated child cells as stale" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:insert_cell, "s1", 1, :elixir, "c2"}, + # Evaluate both cells + {:queue_cell_evaluation, "c1"}, + {:add_cell_evaluation_response, "c1", {:ok, [1, 2, 3]}}, + {:queue_cell_evaluation, "c2"}, + {:add_cell_evaluation_response, "c2", {:ok, [1, 2, 3]}}, + # Queue the first cell again + {:queue_cell_evaluation, "c1"} + ]) + + operation = {:add_cell_evaluation_response, "c1", {:ok, [1, 2, 3]}} + + assert {:ok, + %{ + cell_infos: %{"c2" => %{status: :stale}} + }} = Data.apply_operation(data, operation) + end + end + + defp data_after_operations!(operations) do + Enum.reduce(operations, Data.new(), fn operation, data -> + case Data.apply_operation(data, operation) do + {:ok, data} -> + data + + :error -> + raise "failed to set up test data, operation #{inspect(operation)} returned an error" + end + end) + end +end diff --git a/test/live_book/session_test.exs b/test/live_book/session_test.exs new file mode 100644 index 000000000..5bac527d1 --- /dev/null +++ b/test/live_book/session_test.exs @@ -0,0 +1,83 @@ +defmodule LiveBook.SessionTest do + use ExUnit.Case, async: true + + alias LiveBook.Session + + setup do + {:ok, _} = Session.start_link("1") + %{session_id: "1"} + end + + describe "insert_section/2" do + test "sends an insert opreation to subscribers", %{session_id: session_id} do + Phoenix.PubSub.subscribe(LiveBook.PubSub, "sessions:#{session_id}") + + Session.insert_section(session_id, 0) + assert_receive {:operation, {:insert_section, 0, _id}} + end + end + + describe "insert_cell/4" do + test "sends an insert opreation to subscribers", %{session_id: session_id} do + Phoenix.PubSub.subscribe(LiveBook.PubSub, "sessions:#{session_id}") + + Session.insert_section(session_id, 0) + assert_receive {:operation, {:insert_section, 0, section_id}} + + Session.insert_cell(session_id, section_id, 0, :elixir) + assert_receive {:operation, {:insert_cell, ^section_id, 0, :elixir, _id}} + end + end + + describe "delete_section/2" do + test "sends a delete opreation to subscribers", %{session_id: session_id} do + Phoenix.PubSub.subscribe(LiveBook.PubSub, "sessions:#{session_id}") + + {section_id, _cell_id} = insert_section_and_cell(session_id) + + Session.delete_section(session_id, section_id) + assert_receive {:operation, {:delete_section, ^section_id}} + end + end + + describe "delete_cell/2" do + test "sends a delete opreation to subscribers", %{session_id: session_id} do + Phoenix.PubSub.subscribe(LiveBook.PubSub, "sessions:#{session_id}") + + {_section_id, cell_id} = insert_section_and_cell(session_id) + + Session.delete_cell(session_id, cell_id) + assert_receive {:operation, {:delete_cell, ^cell_id}} + end + end + + describe "queue_cell_evaluation/2" do + test "sends a queue evaluation operation to subscribers", %{session_id: session_id} do + Phoenix.PubSub.subscribe(LiveBook.PubSub, "sessions:#{session_id}") + + {_section_id, cell_id} = insert_section_and_cell(session_id) + + Session.queue_cell_evaluation(session_id, cell_id) + assert_receive {:operation, {:queue_cell_evaluation, ^cell_id}} + end + + test "triggers evaluation and sends update operation once it finishes", + %{session_id: session_id} do + Phoenix.PubSub.subscribe(LiveBook.PubSub, "sessions:#{session_id}") + + {_section_id, cell_id} = insert_section_and_cell(session_id) + + Session.queue_cell_evaluation(session_id, cell_id) + assert_receive {:operation, {:add_cell_evaluation_response, ^cell_id, _}} + end + end + + defp insert_section_and_cell(session_id) do + Session.insert_section(session_id, 0) + assert_receive {:operation, {:insert_section, 0, section_id}} + Session.insert_cell(session_id, section_id, 0, :elixir) + assert_receive {:operation, {:insert_cell, ^section_id, 0, :elixir, cell_id}} + + {section_id, cell_id} + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index 869559e70..7eb55657b 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1 +1 @@ -ExUnit.start() +ExUnit.start(assert_receive_timeout: 500)