Introduce branching sections (#449)

* Introduce branching sections

* parent_index -> branch_parent_index

* Flip the branch icon

* Don't mark branching sections as aborted if the main flow crashes

* Outline more details about branching sections in the Elixir and Livebook notebook

* Add branch indicator to the sections sidebar
This commit is contained in:
Jonatan Kłosko 2021-07-15 18:19:36 +02:00 committed by GitHub
parent a386f61d1d
commit d55c4a1ccc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 2129 additions and 384 deletions

View file

@ -109,7 +109,7 @@ solely client-side operations.
[data-element="section-headline"]:not(:hover)
[data-element="section-name"]:not(:focus)
+ [data-element="section-actions"] {
+ [data-element="section-actions"]:not(:focus-within) {
@apply hidden;
}

View file

@ -18,4 +18,8 @@
box-shadow: 0 0 25px -5px rgba(0, 0, 0, 0.1),
0 0 10px -5px rgba(0, 0, 0, 0.04);
}
.flip-horizontally {
transform: scaleY(-1);
}
}

View file

@ -21,6 +21,7 @@ defmodule Livebook.Evaluator do
formatter: module(),
io_proxy: pid(),
contexts: %{ref() => context()},
initial_context: context(),
# We track the widgets rendered by every evaluation,
# so that we can kill those no longer needed
widget_pids: %{ref() => MapSet.t(pid())},
@ -30,7 +31,7 @@ defmodule Livebook.Evaluator do
@typedoc """
An evaluation context.
"""
@type context :: %{binding: Code.binding(), env: Macro.Env.t()}
@type context :: %{binding: Code.binding(), env: Macro.Env.t(), id: binary()}
@typedoc """
A term used to identify evaluation.
@ -72,14 +73,41 @@ defmodule Livebook.Evaluator do
## Options
* `:file` - file to which the evaluated code belongs. Most importantly,
this has an impact on the value of `__DIR__`.
* `:file` - file to which the evaluated code belongs. Most importantly,
this has an impact on the value of `__DIR__`.
"""
@spec evaluate_code(t(), pid(), String.t(), ref(), ref() | nil, keyword()) :: :ok
def evaluate_code(evaluator, send_to, code, ref, prev_ref \\ nil, opts \\ []) when ref != nil do
GenServer.cast(evaluator, {:evaluate_code, send_to, code, ref, prev_ref, opts})
end
@doc """
Fetches evaluation context (binding and environment) by evaluation reference.
## Options
* `cached_id` - id of context that the sender may already have,
if it matches the fetched context the `{:error, :not_modified}`
tuple is returned instead
"""
@spec fetch_evaluation_context(t(), ref(), keyword()) ::
{:ok, context()} | {:error, :not_modified}
def fetch_evaluation_context(evaluator, ref, opts \\ []) do
cached_id = opts[:cached_id]
GenServer.call(evaluator, {:fetch_evaluation_context, ref, cached_id})
end
@doc """
Fetches an evalutaion context from another `Evaluator` process
and configures it as the initial context for this evaluator.
The process dictionary is also copied to match the given evaluator.
"""
@spec initialize_from(t(), t(), ref()) :: :ok
def initialize_from(evaluator, source_evaluator, source_evaluation_ref) do
GenServer.call(evaluator, {:initialize_from, source_evaluator, source_evaluation_ref})
end
@doc """
Removes the evaluation identified by `ref` from history,
so that further evaluations cannot use it.
@ -120,6 +148,7 @@ defmodule Livebook.Evaluator do
formatter: formatter,
io_proxy: io_proxy,
contexts: %{},
initial_context: initial_context(),
widget_pids: %{},
widget_counts: %{}
}
@ -127,14 +156,14 @@ defmodule Livebook.Evaluator do
defp initial_context() do
env = :elixir.env_for_eval([])
%{binding: [], env: env}
%{binding: [], env: env, id: random_id()}
end
@impl true
def handle_cast({:evaluate_code, send_to, code, ref, prev_ref, opts}, state) do
Evaluator.IOProxy.configure(state.io_proxy, send_to, ref)
context = Map.get_lazy(state.contexts, prev_ref, fn -> initial_context() end)
context = get_context(state, prev_ref)
file = Keyword.get(opts, :file, "nofile")
context = put_in(context.env.file, file)
start_time = System.monotonic_time()
@ -142,7 +171,7 @@ defmodule Livebook.Evaluator do
{result_context, response} =
case eval(code, context.binding, context.env) do
{:ok, result, binding, env} ->
result_context = %{binding: binding, env: env}
result_context = %{binding: binding, env: env, id: random_id()}
response = {:ok, result}
{result_context, response}
@ -178,13 +207,50 @@ defmodule Livebook.Evaluator do
end
def handle_cast({:request_completion_items, send_to, ref, hint, evaluation_ref}, state) do
context = Map.get_lazy(state.contexts, evaluation_ref, fn -> initial_context() end)
context = get_context(state, evaluation_ref)
items = Livebook.Completion.get_completion_items(hint, context.binding, context.env)
send(send_to, {:completion_response, ref, items})
{:noreply, state}
end
@impl true
def handle_call({:fetch_evaluation_context, ref, cached_id}, _from, state) do
context = get_context(state, ref)
reply =
if context.id == cached_id do
{:error, :not_modified}
else
{:ok, context}
end
{:reply, reply, state}
end
def handle_call({:initialize_from, source_evaluator, source_evaluation_ref}, _from, state) do
state =
case Evaluator.fetch_evaluation_context(
source_evaluator,
source_evaluation_ref,
cached_id: state.initial_context.id
) do
{:ok, context} ->
# If the context changed, mirror the process dictionary again
copy_process_dictionary_from(source_evaluator)
put_in(state.initial_context, context)
{:error, :not_modified} ->
state
end
{:reply, :ok, state}
end
defp get_context(state, ref) do
Map.get_lazy(state.contexts, ref, fn -> state.initial_context end)
end
defp eval(code, binding, env) do
try do
quoted = Code.string_to_quoted!(code)
@ -226,6 +292,21 @@ defmodule Livebook.Evaluator do
|> Enum.reject(&(elem(&1, 0) in @elixir_internals))
end
defp random_id() do
:crypto.strong_rand_bytes(20) |> Base.encode32(case: :lower)
end
defp copy_process_dictionary_from(pid) do
{:dictionary, dictionary} = Process.info(pid, :dictionary)
for {key, value} <- dictionary, not internal_dictionary_key?(key) do
Process.put(key, value)
end
end
defp internal_dictionary_key?("$" <> _), do: true
defp internal_dictionary_key?(_), do: false
# Widgets
defp track_evaluation_widgets(state, ref, widget_pids, output) do

View file

@ -15,20 +15,30 @@ defmodule Livebook.LiveMarkdown.Export do
defp render_notebook(notebook) do
name = "# #{notebook.name}"
sections = Enum.map(notebook.sections, &render_section/1)
sections = Enum.map(notebook.sections, &render_section(&1, notebook))
[name | sections]
|> Enum.intersperse("\n\n")
|> prepend_metadata(notebook.metadata)
end
defp render_section(section) do
defp render_section(section, notebook) do
name = "## #{section.name}"
cells = Enum.map(section.cells, &render_cell/1)
metadata = section_metadata(section, notebook)
[name | cells]
|> Enum.intersperse("\n\n")
|> prepend_metadata(section.metadata)
|> prepend_metadata(metadata)
end
defp section_metadata(%{parent_id: nil} = section, _notebook) do
section.metadata
end
defp section_metadata(section, notebook) do
parent_idx = Notebook.section_index(notebook, section.parent_id)
Map.put(section.metadata, "branch_parent_index", parent_idx)
end
defp render_cell(%Cell.Markdown{} = cell) do

View file

@ -19,6 +19,7 @@ defmodule Livebook.LiveMarkdown.Import do
ast
|> group_elements()
|> build_notebook()
|> postprocess_notebook()
{notebook, earmark_messages ++ rewrite_messages}
end
@ -259,4 +260,16 @@ defmodule Livebook.LiveMarkdown.Import do
{key, value}
end)
end
defp postprocess_notebook(notebook) do
sections =
Enum.map(notebook.sections, fn section ->
# Set parent_id based on the persisted branch_parent_index if present
{parent_idx, metadata} = Map.pop(section.metadata, "branch_parent_index")
parent = parent_idx && Enum.at(notebook.sections, parent_idx)
%{section | metadata: metadata, parent_id: parent && parent.id}
end)
%{notebook | sections: sections}
end
end

View file

@ -16,6 +16,7 @@ defmodule Livebook.Notebook do
defstruct [:name, :version, :sections, :metadata]
alias Livebook.Notebook.{Section, Cell}
alias Livebook.Utils.Graph
import Livebook.Utils, only: [access_by_id: 1]
@type metadata :: %{String.t() => term()}
@ -248,6 +249,46 @@ defmodule Livebook.Notebook do
index |> max(0) |> min(length(list) - 1)
end
@doc """
Checks if `section` can be moved by `offset`.
Specifically, this function checks if after the move
all child sections are still below their parent sections.
"""
@spec can_move_section_by?(t(), Section.t(), integer()) :: boolean()
def can_move_section_by?(notebook, section, offset)
def can_move_section_by?(notebook, %{parent_id: nil} = section, offset) do
notebook.sections
|> Enum.with_index()
|> Enum.filter(fn {that_section, _idx} -> that_section.parent_id == section.id end)
|> Enum.map(fn {_section, idx} -> idx end)
|> case do
[] ->
true
child_indices ->
section_idx = section_index(notebook, section.id)
section_idx + offset < Enum.min(child_indices)
end
end
def can_move_section_by?(notebook, section, offset) do
parent_idx = section_index(notebook, section.parent_id)
section_idx = section_index(notebook, section.id)
parent_idx < section_idx + offset
end
@doc """
Returns sections that are valid parents for the given section.
"""
@spec valid_parents_for(t(), Section.id()) :: list(Section.t())
def valid_parents_for(notebook, section_id) do
notebook.sections
|> Enum.take_while(&(&1.id != section_id))
|> Enum.filter(&(&1.parent_id == nil))
end
@doc """
Moves section by the given offset.
"""
@ -257,11 +298,7 @@ defmodule Livebook.Notebook do
# Then we find its' new index from given offset.
# Finally, we move the section, and return the new notebook.
idx =
Enum.find_index(notebook.sections, fn
section -> section.id == section_id
end)
idx = section_index(notebook, section_id)
new_idx = (idx + offset) |> clamp_index(notebook.sections)
{section, sections} = List.pop_at(notebook.sections, idx)
@ -298,9 +335,17 @@ defmodule Livebook.Notebook do
"""
@spec parent_cells_with_section(t(), Cell.id()) :: list({Cell.t(), Section.t()})
def parent_cells_with_section(notebook, cell_id) do
parent_cell_ids =
notebook
|> cell_dependency_graph()
|> Graph.find_path(cell_id, nil)
|> MapSet.new()
|> MapSet.delete(cell_id)
|> MapSet.delete(nil)
notebook
|> cells_with_section()
|> Enum.take_while(fn {cell, _} -> cell.id != cell_id end)
|> Enum.filter(fn {cell, _} -> MapSet.member?(parent_cell_ids, cell.id) end)
|> Enum.reverse()
end
@ -312,10 +357,87 @@ defmodule Livebook.Notebook do
"""
@spec child_cells_with_section(t(), Cell.id()) :: list({Cell.t(), Section.t()})
def child_cells_with_section(notebook, cell_id) do
graph = cell_dependency_graph(notebook)
child_cell_ids =
graph
|> Graph.leaves()
|> Enum.flat_map(&Graph.find_path(graph, &1, cell_id))
|> MapSet.new()
|> MapSet.delete(cell_id)
notebook
|> cells_with_section()
|> Enum.drop_while(fn {cell, _} -> cell.id != cell_id end)
|> Enum.drop(1)
|> Enum.filter(fn {cell, _} -> MapSet.member?(child_cell_ids, cell.id) end)
end
@doc """
Computes cell dependency graph.
Every cell has one or none parent cells, so the graph
is represented as a map, with cell id as the key and
its parent cell id as the value. Cells with no parent
are also included with the value of `nil`.
## Options
* `:cell_filter` - a function determining if the given
cell should be included in the graph. If a cell is
excluded, transitive parenthood still applies.
By default all cells are included.
"""
@spec cell_dependency_graph(t()) :: Graph.t(Cell.id())
def cell_dependency_graph(notebook, opts \\ []) do
notebook.sections
|> Enum.reduce(
{%{}, nil, %{}},
fn section, {graph, prev_regular_section, last_id_by_section} ->
prev_section_id =
if section.parent_id,
do: section.parent_id,
else: prev_regular_section && prev_regular_section.id
# Cell that this section directly depends on,
# if the section it's empty it's last id of the previous section
prev_cell_id = prev_section_id && last_id_by_section[prev_section_id]
{graph, last_cell_id} =
if filter = opts[:cell_filter] do
Enum.filter(section.cells, filter)
else
section.cells
end
|> Enum.map(& &1.id)
|> Enum.reduce({graph, prev_cell_id}, fn cell_id, {graph, prev_cell_id} ->
{put_in(graph[cell_id], prev_cell_id), cell_id}
end)
last_id_by_section = put_in(last_id_by_section[section.id], last_cell_id)
{
graph,
if(section.parent_id, do: prev_regular_section, else: section),
last_id_by_section
}
end
)
|> elem(0)
end
@doc """
Returns index of the given section or `nil` if not found.
"""
@spec section_index(t(), Section.id()) :: non_neg_integer() | nil
def section_index(notebook, section_id) do
Enum.find_index(notebook.sections, &(&1.id == section_id))
end
@doc """
Returns a list of sections branching from the given one.
"""
@spec child_sections(t(), Section.id()) :: list(Section.t())
def child_sections(notebook, section_id) do
Enum.filter(notebook.sections, &(&1.parent_id == section_id))
end
@doc """

View file

@ -106,6 +106,48 @@ You can also choose to run inside a *Mix* project (as you would with `iex -S mix
manually *attach* to an existing distributed node, or run your Elixir notebook
*embedded* within the Livebook source itself.
## More on branches #1
We already mentioned branching sections in
[Welcome to Livebook](/explore/notebooks/intro-to-livebook),
but in Elixir terms each branching section:
* runs in a separate process from the main flow
* copies relevant bindings, imports and alises from the parent
* updates its process dictionary to mirror the parent
Let's see this in practice:
```elixir
parent = self()
```
```elixir
Process.put(:info, "deal carefully with process dictionaries")
```
<!-- livebook:{"branch_parent_index":5} -->
## More on branches #2
```elixir
parent
```
```elixir
self()
```
```elixir
Process.get(:info)
```
Since this branch is a separate process, a crash has limited scope:
```elixir
Process.exit(self(), :kill)
```
## Inputs
Livebook supports inputs and you read the input values directly
@ -148,8 +190,8 @@ cell on the bottom-right of the cell. After evaluation, the total
time can be seen by hovering the green dot.
However, it is important to remember that all code outside of
a module in Elixir is _evaluated_, and therefore executes much
slower than code defined inside modules, which are _compiled_.
a module in Elixir is *evaluated*, and therefore executes much
slower than code defined inside modules, which are *compiled*.
Let's see an example. Run the cell below:

View file

@ -60,6 +60,32 @@ for _ <- 1..3 do
end
```
<!-- livebook:{"branch_parent_index":2} -->
## Branching sections
Additionally, you can make a section **branch out** from any
previous regular section. Hover over the section name to reveal
available actions and click on the branch icon to select the
parent section.
You still have access to all the previous data:
```elixir
{message, cats}
```
The important characteristic of a branching section is that
it runs independently from other sections and as such is well
suited for running long computations "in backgroud".
```elixir
Process.sleep(300_000)
```
Having this cell running, feel free to insert another Elixir cell
in the section below and see it evaluates immediately.
## Notebook files
By default notebooks are kept in memory, which is fine for interactive hacking,

View file

@ -5,8 +5,12 @@ defmodule Livebook.Notebook.Section do
#
# Each section contains a number of cells and serves as a way
# of grouping related cells.
#
# A section may optionally have a parent, in which case it's
# a branching section. Such section logically follows its
# parent section and has no impact on any further sections.
defstruct [:id, :name, :cells, :metadata]
defstruct [:id, :name, :cells, :parent_id, :metadata]
alias Livebook.Notebook.Cell
alias Livebook.Utils
@ -18,6 +22,7 @@ defmodule Livebook.Notebook.Section do
id: id(),
name: String.t(),
cells: list(Cell.t()),
parent_id: id() | nil,
metadata: metadata()
}
@ -30,6 +35,7 @@ defmodule Livebook.Notebook.Section do
id: Utils.random_id(),
name: "Section",
cells: [],
parent_id: nil,
metadata: %{}
}
end

View file

@ -8,9 +8,27 @@ defprotocol Livebook.Runtime do
# however the protocol does not require that.
@typedoc """
A term used to identify evaluation or container.
An arbitrary term identifying an evaluation container.
A container is an abstraction of an isolated group of evaluations.
Containers are mostly independent and consequently can be evaluated
concurrently if possible.
Note that every evaluation can use the resulting environment
and bindings of any previous evaluation, even from a different
container.
"""
@type ref :: term()
@type container_ref :: term()
@typedoc """
An arbitrary term identifying an evaluation.
"""
@type evaluation_ref :: term()
@typedoc """
A pair identifying evaluation together with its container.
"""
@type locator :: {container_ref(), evaluation_ref() | nil}
@typedoc """
A single completion result.
@ -28,9 +46,12 @@ defprotocol Livebook.Runtime do
@doc """
Sets the caller as runtime owner.
The runtime most likely has some kind of leading process,
this method starts monitoring it and returns the monitor reference,
so the caller knows if the runtime is down by listening to a :DOWN message.
It's advised for each runtime to have a leading process
that is coupled to the lifetime of the underlying runtime
resources. In this case the `connect` function may start
monitoring that process and return the monitor reference.
This way the caller is notified when the runtime goes down
by listening to the :DOWN message.
"""
@spec connect(t()) :: reference()
def connect(runtime)
@ -46,69 +67,83 @@ defprotocol Livebook.Runtime do
@doc """
Asynchronously parses and evaluates the given code.
Container isolates a group of evaluations. Every evaluation can use previous
evaluation's environment and bindings, as long as they belong to the same container.
The given `locator` identifies the container where
the code should be evaluated as well as the evaluation
reference to store the resulting contxt under.
Evaluation outputs are send to the connected runtime owner.
Additionally, `prev_locator` points to a previous
evaluation to be used as the starting point of this
evaluation. If not applicable, the previous evaluation
reference may be specified as `nil`.
## Communication
Evaluation outputs are sent to the connected runtime owner.
The messages should be of the form:
* `{:evaluation_output, ref, output}` - output captured during evaluation
* `{:evaluation_response, ref, output, metadata}` - final result of the evaluation, recognised metadata entries are: `evaluation_time_ms`
* `{:evaluation_output, ref, output}` - output captured
during evaluation
The evaluation may request user input by sending `{:evaluation_input, ref, reply_to, prompt}`
to the runtime owner, who is supposed to reply with `{:evaluation_input_reply, reply}`
with `reply` being either `{:ok, input}` or `:error` if no matching input can be found.
* `{:evaluation_response, ref, output, metadata}` - final
result of the evaluation. Recognised metadata entries
are: `evaluation_time_ms`
If the evaluation state within a container is lost (e.g. a process goes down),
the runtime can send `{:container_down, container_ref, message}` to notify the owner.
The evaluation may request user input by sending
`{:evaluation_input, ref, reply_to, prompt}` to the runtime owner,
which is supposed to reply with `{:evaluation_input_reply, reply}`
where `reply` is either `{:ok, input}` or `:error` if no matching
input can be found.
In all of the above `ref` is the evaluation reference.
If the evaluation state within a container is lost (for example
a process goes down), the runtime may send `{:container_down, container_ref, message}`
to notify the owner.
## Options
* `:file` - file to which the evaluated code belongs. Most importantly,
this has an impact on the value of `__DIR__`.
"""
@spec evaluate_code(t(), String.t(), ref(), ref(), ref() | nil, keyword()) :: :ok
def evaluate_code(runtime, code, container_ref, evaluation_ref, prev_evaluation_ref, opts \\ [])
@spec evaluate_code(t(), String.t(), locator(), locator(), keyword()) :: :ok
def evaluate_code(runtime, code, locator, prev_locator, opts \\ [])
@doc """
Disposes of evaluation identified by the given ref.
Disposes of an evaluation identified by the given locator.
This should be used to cleanup resources related to old evaluation if no longer needed.
This can be used to cleanup resources related to an old evaluation
if no longer needed.
"""
@spec forget_evaluation(t(), ref(), ref()) :: :ok
def forget_evaluation(runtime, container_ref, evaluation_ref)
@spec forget_evaluation(t(), locator()) :: :ok
def forget_evaluation(runtime, locator)
@doc """
Disposes of evaluation container identified by the given ref.
Disposes of an evaluation container identified by the given ref.
This should be used to cleanup resources keeping track
of the container and contained evaluations.
This should be used to cleanup resources keeping track of the
container all of its evaluations.
"""
@spec drop_container(t(), ref()) :: :ok
@spec drop_container(t(), container_ref()) :: :ok
def drop_container(runtime, container_ref)
@doc """
Asynchronously finds completion items matching the given `hint` text.
The given `{container_ref, evaluation_ref}` pair idenfities an evaluation,
which bindings and environment are used to provide a more relevant completion results.
If there's no appropriate evaluation, `nil` refs can be provided.
The given `locator` idenfities an evaluation, which bindings
and environment are used to provide a more relevant completion
results. If there's no appropriate evaluation, `nil` refs can
be provided.
Completion response is sent to the `send_to` process as `{:completion_response, ref, items}`,
where `items` is a list of `Livebook.Runtime.completion_item()`.
Completion response is sent to the `send_to` process as
`{:completion_response, ref, items}`, where `items` is a
list of `t:Livebook.Runtime.completion_item/0`.
"""
@spec request_completion_items(t(), pid(), term(), String.t(), ref() | nil, ref() | nil) :: :ok
def request_completion_items(
runtime,
send_to,
ref,
hint,
container_ref,
evaluation_ref
)
@spec request_completion_items(t(), pid(), term(), String.t(), locator()) :: :ok
def request_completion_items(runtime, send_to, completion_ref, hint, locator)
@doc """
Synchronously starts a runtime of the same type with the same parameters.
Synchronously starts a runtime of the same type with the
same parameters.
"""
@spec duplicate(Runtime.t()) :: {:ok, Runtime.t()} | {:error, String.t()}
def duplicate(runtime)

View file

@ -49,40 +49,25 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do
ErlDist.RuntimeServer.stop(runtime.server_pid)
end
def evaluate_code(
runtime,
code,
container_ref,
evaluation_ref,
prev_evaluation_ref,
opts \\ []
) do
ErlDist.RuntimeServer.evaluate_code(
runtime.server_pid,
code,
container_ref,
evaluation_ref,
prev_evaluation_ref,
opts
)
def evaluate_code(runtime, code, locator, prev_locator, opts \\ []) do
ErlDist.RuntimeServer.evaluate_code(runtime.server_pid, code, locator, prev_locator, opts)
end
def forget_evaluation(runtime, container_ref, evaluation_ref) do
ErlDist.RuntimeServer.forget_evaluation(runtime.server_pid, container_ref, evaluation_ref)
def forget_evaluation(runtime, locator) do
ErlDist.RuntimeServer.forget_evaluation(runtime.server_pid, locator)
end
def drop_container(runtime, container_ref) do
ErlDist.RuntimeServer.drop_container(runtime.server_pid, container_ref)
end
def request_completion_items(runtime, send_to, ref, hint, container_ref, evaluation_ref) do
def request_completion_items(runtime, send_to, completion_ref, hint, locator) do
ErlDist.RuntimeServer.request_completion_items(
runtime.server_pid,
send_to,
ref,
completion_ref,
hint,
container_ref,
evaluation_ref
locator
)
end

View file

@ -77,40 +77,25 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.ElixirStandalone do
ErlDist.RuntimeServer.stop(runtime.server_pid)
end
def evaluate_code(
runtime,
code,
container_ref,
evaluation_ref,
prev_evaluation_ref,
opts \\ []
) do
ErlDist.RuntimeServer.evaluate_code(
runtime.server_pid,
code,
container_ref,
evaluation_ref,
prev_evaluation_ref,
opts
)
def evaluate_code(runtime, code, locator, prev_locator, opts \\ []) do
ErlDist.RuntimeServer.evaluate_code(runtime.server_pid, code, locator, prev_locator, opts)
end
def forget_evaluation(runtime, container_ref, evaluation_ref) do
ErlDist.RuntimeServer.forget_evaluation(runtime.server_pid, container_ref, evaluation_ref)
def forget_evaluation(runtime, locator) do
ErlDist.RuntimeServer.forget_evaluation(runtime.server_pid, locator)
end
def drop_container(runtime, container_ref) do
ErlDist.RuntimeServer.drop_container(runtime.server_pid, container_ref)
end
def request_completion_items(runtime, send_to, ref, hint, container_ref, evaluation_ref) do
def request_completion_items(runtime, send_to, completion_ref, hint, locator) do
ErlDist.RuntimeServer.request_completion_items(
runtime.server_pid,
send_to,
ref,
completion_ref,
hint,
container_ref,
evaluation_ref
locator
)
end

View file

@ -51,40 +51,25 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do
ErlDist.RuntimeServer.stop(runtime.server_pid)
end
def evaluate_code(
runtime,
code,
container_ref,
evaluation_ref,
prev_evaluation_ref,
opts \\ []
) do
ErlDist.RuntimeServer.evaluate_code(
runtime.server_pid,
code,
container_ref,
evaluation_ref,
prev_evaluation_ref,
opts
)
def evaluate_code(runtime, code, locator, prev_locator, opts \\ []) do
ErlDist.RuntimeServer.evaluate_code(runtime.server_pid, code, locator, prev_locator, opts)
end
def forget_evaluation(runtime, container_ref, evaluation_ref) do
ErlDist.RuntimeServer.forget_evaluation(runtime.server_pid, container_ref, evaluation_ref)
def forget_evaluation(runtime, locator) do
ErlDist.RuntimeServer.forget_evaluation(runtime.server_pid, locator)
end
def drop_container(runtime, container_ref) do
ErlDist.RuntimeServer.drop_container(runtime.server_pid, container_ref)
end
def request_completion_items(runtime, send_to, ref, hint, container_ref, evaluation_ref) do
def request_completion_items(runtime, send_to, completion_ref, hint, locator) do
ErlDist.RuntimeServer.request_completion_items(
runtime.server_pid,
send_to,
ref,
completion_ref,
hint,
container_ref,
evaluation_ref
locator
)
end

View file

@ -5,14 +5,18 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
#
# This process handles `Livebook.Runtime` operations,
# like evaluation and completion. It spawns/terminates
# individual evaluators as necessary.
# individual evaluators corresponding to evaluation
# containers as necessary.
#
# Every runtime server must have an owner process,
# to which the server lifetime is bound.
#
# For more specification see `Livebook.Runtime`.
use GenServer, restart: :temporary
alias Livebook.Evaluator
alias Livebook.Runtime
alias Livebook.Runtime.ErlDist
@await_owner_timeout 5_000
@ -20,7 +24,7 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
@doc """
Starts the manager.
Note: make sure to call `set_owner` within `@await_owner_timeout`
Note: make sure to call `set_owner` within #{@await_owner_timeout}ms
or the runtime server assumes it's not needed and terminates.
"""
def start_link(opts \\ []) do
@ -40,77 +44,60 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
end
@doc """
Evaluates the given code using an `Evaluator` process
belonging to the given `container_ref` and instructs
Evaluates the given code using an `Livebook.Evaluator`
process belonging to the given container and instructs
it to send all the outputs to the owner process.
If that's the first evaluation for this `container_ref`,
a new evaluator is started.
If no evaluator exists for the given container, a new
one is started.
See `Evaluator` for more details.
See `Livebook.Evaluator` for more details.
"""
@spec evaluate_code(
pid(),
String.t(),
Evaluator.ref(),
Evaluator.ref(),
Evaluator.ref() | nil,
keyword()
) :: :ok
def evaluate_code(pid, code, container_ref, evaluation_ref, prev_evaluation_ref, opts \\ []) do
GenServer.cast(
pid,
{:evaluate_code, code, container_ref, evaluation_ref, prev_evaluation_ref, opts}
)
@spec evaluate_code(pid(), String.t(), Runtime.locator(), Runtime.locator(), keyword()) :: :ok
def evaluate_code(pid, code, locator, prev_locator, opts \\ []) do
GenServer.cast(pid, {:evaluate_code, code, locator, prev_locator, opts})
end
@doc """
Removes the specified evaluation from the history.
See `Evaluator` for more details.
See `Livebook.Evaluator` for more details.
"""
@spec forget_evaluation(pid(), Evaluator.ref(), Evaluator.ref()) :: :ok
def forget_evaluation(pid, container_ref, evaluation_ref) do
GenServer.cast(pid, {:forget_evaluation, container_ref, evaluation_ref})
@spec forget_evaluation(pid(), Runtime.locator()) :: :ok
def forget_evaluation(pid, locator) do
GenServer.cast(pid, {:forget_evaluation, locator})
end
@doc """
Terminates the `Evaluator` process belonging to the given container.
Terminates the `Livebook.Evaluator` process that belongs
to the given container.
"""
@spec drop_container(pid(), Evaluator.ref()) :: :ok
@spec drop_container(pid(), Runtime.container_ref()) :: :ok
def drop_container(pid, container_ref) do
GenServer.cast(pid, {:drop_container, container_ref})
end
@doc """
Asynchronously sends completion request for the given `hint` text.
Asynchronously sends completion request for the given
`hint` text.
The completion request is forwarded to `Evaluator` process
belonging to the given `container_ref`. If there's not evaluator,
there's also no binding and environment, so the completion is handled
by a temporary process.
The completion request is forwarded to `Livebook.Evaluator`
process that belongs to the given container. If there's no
evaluator, there's also no binding and environment, so a
generic completion is handled by a temporary process.
See `Livebook.Runtime` for more details.
"""
@spec request_completion_items(
pid(),
pid(),
term(),
String.t(),
Evaluator.ref(),
Evaluator.ref()
) :: :ok
def request_completion_items(pid, send_to, ref, hint, container_ref, evaluation_ref) do
GenServer.cast(
pid,
{:request_completion_items, send_to, ref, hint, container_ref, evaluation_ref}
)
@spec request_completion_items(pid(), pid(), term(), String.t(), Runtime.locator()) :: :ok
def request_completion_items(pid, send_to, completion_ref, hint, locator) do
GenServer.cast(pid, {:request_completion_items, send_to, completion_ref, hint, locator})
end
@doc """
Stops the manager.
This results in all Livebook-related modules being unloaded from this node.
This results in all Livebook-related modules being unloaded
from the runtime node.
"""
@spec stop(pid()) :: :ok
def stop(pid) do
@ -174,11 +161,26 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
end
def handle_cast(
{:evaluate_code, code, container_ref, evaluation_ref, prev_evaluation_ref, opts},
{:evaluate_code, code, {container_ref, evaluation_ref}, prev_locator, opts},
state
) do
state = ensure_evaluator(state, container_ref)
prev_evaluation_ref =
case prev_locator do
{^container_ref, evaluation_ref} ->
evaluation_ref
{parent_container_ref, evaluation_ref} ->
Evaluator.initialize_from(
state.evaluators[container_ref],
state.evaluators[parent_container_ref],
evaluation_ref
)
nil
end
Evaluator.evaluate_code(
state.evaluators[container_ref],
state.owner,
@ -191,7 +193,7 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
{:noreply, state}
end
def handle_cast({:forget_evaluation, container_ref, evaluation_ref}, state) do
def handle_cast({:forget_evaluation, {container_ref, evaluation_ref}}, state) do
with {:ok, evaluator} <- Map.fetch(state.evaluators, container_ref) do
Evaluator.forget_evaluation(evaluator, evaluation_ref)
end
@ -205,7 +207,7 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
end
def handle_cast(
{:request_completion_items, send_to, ref, hint, container_ref, evaluation_ref},
{:request_completion_items, send_to, ref, hint, {container_ref, evaluation_ref}},
state
) do
if evaluator = Map.get(state.evaluators, container_ref) do

View file

@ -130,40 +130,25 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.MixStandalone do
ErlDist.RuntimeServer.stop(runtime.server_pid)
end
def evaluate_code(
runtime,
code,
container_ref,
evaluation_ref,
prev_evaluation_ref,
opts \\ []
) do
ErlDist.RuntimeServer.evaluate_code(
runtime.server_pid,
code,
container_ref,
evaluation_ref,
prev_evaluation_ref,
opts
)
def evaluate_code(runtime, code, locator, prev_locator, opts \\ []) do
ErlDist.RuntimeServer.evaluate_code(runtime.server_pid, code, locator, prev_locator, opts)
end
def forget_evaluation(runtime, container_ref, evaluation_ref) do
ErlDist.RuntimeServer.forget_evaluation(runtime.server_pid, container_ref, evaluation_ref)
def forget_evaluation(runtime, locator) do
ErlDist.RuntimeServer.forget_evaluation(runtime.server_pid, locator)
end
def drop_container(runtime, container_ref) do
ErlDist.RuntimeServer.drop_container(runtime.server_pid, container_ref)
end
def request_completion_items(runtime, send_to, ref, hint, container_ref, evaluation_ref) do
def request_completion_items(runtime, send_to, completion_ref, hint, locator) do
ErlDist.RuntimeServer.request_completion_items(
runtime.server_pid,
send_to,
ref,
completion_ref,
hint,
container_ref,
evaluation_ref
locator
)
end

View file

@ -8,9 +8,40 @@ defmodule Livebook.Session do
# Receives update requests from the clients and notifies
# them of any changes applied to the notebook.
#
# The core concept is the `Data` structure
# ## Collaborative state
#
# The core concept is the `Livebook.Session.Data` structure
# to which we can apply reproducible operations.
# See `Data` for more information.
# See `Livebook.Session.Data` for more information.
#
# ## Evaluation
#
# All regular sections are evaluated in the same process
# (the :main_flow evaluation container). On the other hand,
# each branching section is evaluated in its own process
# and thus runs concurrently.
#
# ### Implementation considerations
#
# In practice, every evaluation container is a `Livebook.Evaluator`
# process, so we have one such process for the main flow and one
# for each branching section. Since a branching section inherits
# the evaluation context from the parent section, the last context
# needs to be copied from the main flow evaluator to the branching
# section evaluator. The latter synchronously asks the former for
# that context using `Livebook.Evaluator.fetch_evaluation_context/3`.
# Consequently, in order to evaluate the first cell in a branching
# section, the main flow needs to be free of work, otherwise we wait.
# This assumptions are mirrored in by `Livebook.Session.Data` when
# determining cells for evaluation.
#
# Note: the context could be copied asynchronously if evaluator
# kept the contexts in its process dictionary, however the other
# evaluator could only read the whole process dictionary, thus
# allocating a lot of memory unnecessarily, which would be unacceptable
# for large data. By making a synchronous request to the evalutor
# for a single specific evaluation context we make sure to copy
# as little memory as necessary.
use GenServer, restart: :temporary
@ -127,6 +158,22 @@ defmodule Livebook.Session do
GenServer.cast(name(session_id), {:insert_section_into, self(), section_id, index})
end
@doc """
Asynchronously sends parent update request to the server.
"""
@spec set_section_parent(id(), Section.id(), Section.id()) :: :ok
def set_section_parent(session_id, section_id, parent_id) do
GenServer.cast(name(session_id), {:set_section_parent, self(), section_id, parent_id})
end
@doc """
Asynchronously sends parent update request to the server.
"""
@spec unset_section_parent(id(), Section.id()) :: :ok
def unset_section_parent(session_id, section_id) do
GenServer.cast(name(session_id), {:unset_section_parent, self(), section_id})
end
@doc """
Asynchronously sends cell insertion request to the server.
"""
@ -380,6 +427,18 @@ defmodule Livebook.Session do
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:set_section_parent, client_pid, section_id, parent_id}, state) do
# Include new id in the operation, so it's reproducible
operation = {:set_section_parent, client_pid, section_id, parent_id}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:unset_section_parent, client_pid, section_id}, state) do
# Include new id in the operation, so it's reproducible
operation = {:unset_section_parent, client_pid, section_id}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:insert_cell, client_pid, section_id, index, type}, state) do
# Include new id in the operation, so it's reproducible
operation = {:insert_cell, client_pid, section_id, index, type, Utils.random_id()}
@ -552,10 +611,15 @@ defmodule Livebook.Session do
{:noreply, state}
end
def handle_info({:container_down, :main, message}, state) do
def handle_info({:container_down, container_ref, message}, state) do
broadcast_error(state.session_id, "evaluation process terminated - #{message}")
operation = {:reflect_evaluation_failure, self()}
operation =
case container_ref do
:main_flow -> {:reflect_main_evaluation_failure, self()}
section_id -> {:reflect_evaluation_failure, self(), section_id}
end
{:noreply, handle_operation(state, operation)}
end
@ -647,13 +711,16 @@ defmodule Livebook.Session do
end
end
# Given any operation on `Data`, the process does the following:
# Given any operation on `Livebook.Session.Data`, the process
# does the following:
#
# * broadcasts the operation to all clients immediately,
# so that they can update their local `Data`
# * applies the operation to own local `Data`
# so that they can update their local `Livebook.Session.Data`
#
# * applies the operation to own local `Livebook.Session.Data`
#
# * if necessary, performs the relevant actions (e.g. starts cell evaluation),
# to reflect the new `Data`
# to reflect the new `Livebook.Session.Data`
#
defp handle_operation(state, operation) do
broadcast_operation(state.session_id, operation)
@ -739,33 +806,29 @@ defmodule Livebook.Session do
end
end
defp handle_action(state, {:start_evaluation, cell, _section}) do
prev_ref =
state.data.notebook
|> Notebook.parent_cells_with_section(cell.id)
|> Enum.find_value(fn {cell, _} -> is_struct(cell, Cell.Elixir) && cell.id end)
defp handle_action(state, {:start_evaluation, cell, section}) do
file = (state.data.path || "") <> "#cell"
opts = [file: file]
Runtime.evaluate_code(state.data.runtime, cell.source, :main, cell.id, prev_ref, opts)
locator = {container_ref_for_section(section), cell.id}
prev_locator = find_prev_locator(state.data.notebook, cell, section)
Runtime.evaluate_code(state.data.runtime, cell.source, locator, prev_locator, opts)
evaluation_digest = :erlang.md5(cell.source)
handle_operation(state, {:evaluation_started, self(), cell.id, evaluation_digest})
end
defp handle_action(state, {:stop_evaluation, _section}) do
defp handle_action(state, {:stop_evaluation, section}) do
if state.data.runtime do
Runtime.drop_container(state.data.runtime, :main)
Runtime.drop_container(state.data.runtime, container_ref_for_section(section))
end
state
end
defp handle_action(state, {:forget_evaluation, cell, _section}) do
defp handle_action(state, {:forget_evaluation, cell, section}) do
if state.data.runtime do
Runtime.forget_evaluation(state.data.runtime, :main, cell.id)
Runtime.forget_evaluation(state.data.runtime, {container_ref_for_section(section), cell.id})
end
state
@ -808,4 +871,30 @@ defmodule Livebook.Session do
state
end
end
@doc """
Determines locator of the evaluation that the given
cell depends on.
"""
@spec find_prev_locator(Notebook.t(), Cell.t(), Section.t()) :: Runtime.locator()
def find_prev_locator(notebook, cell, section) do
default =
case section.parent_id do
nil ->
{container_ref_for_section(section), nil}
parent_id ->
{:ok, parent} = Notebook.fetch_section(notebook, parent_id)
{container_ref_for_section(parent), nil}
end
notebook
|> Notebook.parent_cells_with_section(cell.id)
|> Enum.find_value(default, fn {cell, section} ->
is_struct(cell, Cell.Elixir) && {container_ref_for_section(section), cell.id}
end)
end
defp container_ref_for_section(%{parent_id: nil}), do: :main_flow
defp container_ref_for_section(section), do: section.id
end

View file

@ -30,6 +30,7 @@ defmodule Livebook.Session.Data do
alias Livebook.{Notebook, Delta, Runtime, JSInterop}
alias Livebook.Users.User
alias Livebook.Notebook.{Cell, Section}
alias Livebook.Utils.Graph
@type t :: %__MODULE__{
notebook: Notebook.t(),
@ -89,6 +90,8 @@ defmodule Livebook.Session.Data do
@type operation ::
{:insert_section, pid(), index(), Section.id()}
| {:insert_section_into, pid(), Section.id(), index(), Section.id()}
| {:set_section_parent, pid(), Section.id(), parent_id :: Section.id()}
| {:unset_section_parent, pid(), Section.id()}
| {:insert_cell, pid(), Section.id(), index(), Cell.type(), Cell.id()}
| {:delete_section, pid(), Section.id(), delete_cells :: boolean()}
| {:delete_cell, pid(), Cell.id()}
@ -100,7 +103,8 @@ defmodule Livebook.Session.Data do
| {:add_cell_evaluation_output, pid(), Cell.id(), term()}
| {:add_cell_evaluation_response, pid(), Cell.id(), term()}
| {:bind_input, pid(), elixir_cell_id :: Cell.id(), input_cell_id :: Cell.id()}
| {:reflect_evaluation_failure, pid()}
| {:reflect_main_evaluation_failure, pid()}
| {:reflect_evaluation_failure, pid(), Section.id()}
| {:cancel_cell_evaluation, pid(), Cell.id()}
| {:set_notebook_name, pid(), String.t()}
| {:set_section_name, pid(), Section.id(), String.t()}
@ -201,6 +205,40 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:set_section_parent, _client_pid, section_id, parent_id}) do
with {:ok, section} <- Notebook.fetch_section(data.notebook, section_id),
{:ok, parent_section} <- Notebook.fetch_section(data.notebook, parent_id),
true <- section.parent_id != parent_id,
[] <- Notebook.child_sections(data.notebook, section.id),
true <- parent_section in Notebook.valid_parents_for(data.notebook, section.id) do
data
|> with_actions()
|> cancel_section_evaluation(section)
|> mark_section_and_dependent_cells_as_stale(section)
|> set_section_parent(section, parent_section)
|> set_dirty()
|> wrap_ok()
else
_ -> :error
end
end
def apply_operation(data, {:unset_section_parent, _client_pid, section_id}) do
with {:ok, section} <- Notebook.fetch_section(data.notebook, section_id),
true <- section.parent_id != nil do
data
|> with_actions()
|> cancel_section_evaluation(section)
|> add_action({:stop_evaluation, section})
|> unset_section_parent(section)
|> mark_section_and_dependent_cells_as_stale(section)
|> set_dirty()
|> wrap_ok()
else
_ -> :error
end
end
def apply_operation(data, {:insert_cell, _client_pid, section_id, index, type, id}) do
with {:ok, _section} <- Notebook.fetch_section(data.notebook, section_id) do
cell = %{Cell.new(type) | id: id}
@ -215,7 +253,8 @@ defmodule Livebook.Session.Data do
def apply_operation(data, {:delete_section, _client_pid, id, delete_cells}) do
with {:ok, section} <- Notebook.fetch_section(data.notebook, id),
true <- section != hd(data.notebook.sections) or delete_cells do
true <- section != hd(data.notebook.sections) or delete_cells,
[] <- Notebook.child_sections(data.notebook, section.id) do
data
|> with_actions()
|> delete_section(section, delete_cells)
@ -264,7 +303,8 @@ defmodule Livebook.Session.Data do
def apply_operation(data, {:move_section, _client_pid, id, offset}) do
with {:ok, section} <- Notebook.fetch_section(data.notebook, id),
true <- offset != 0 do
true <- offset != 0,
true <- Notebook.can_move_section_by?(data.notebook, section, offset) do
data
|> with_actions()
|> move_section(section, offset)
@ -344,13 +384,22 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:reflect_evaluation_failure, _client_pid}) do
def apply_operation(data, {:reflect_main_evaluation_failure, _client_pid}) do
data
|> with_actions()
|> clear_evaluation()
|> clear_main_evaluation()
|> wrap_ok()
end
def apply_operation(data, {:reflect_evaluation_failure, _client_pid, section_id}) do
with {:ok, section} <- Notebook.fetch_section(data.notebook, section_id) do
data
|> with_actions()
|> clear_section_evaluation(section)
|> wrap_ok()
end
end
def apply_operation(data, {:cancel_cell_evaluation, _client_pid, id}) do
with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id),
true <- data.cell_infos[cell.id].evaluation_status in [:evaluating, :queued] do
@ -507,6 +556,26 @@ defmodule Livebook.Session.Data do
)
end
defp set_section_parent({data, _} = data_actions, section, parent_section) do
data_actions
|> set!(
notebook:
Notebook.update_section(data.notebook, section.id, fn section ->
%{section | parent_id: parent_section.id}
end)
)
end
defp unset_section_parent({data, _} = data_actions, section) do
data_actions
|> set!(
notebook:
Notebook.update_section(data.notebook, section.id, fn section ->
%{section | parent_id: nil}
end)
)
end
defp insert_cell({data, _} = data_actions, section_id, index, cell) do
data_actions
|> set!(
@ -522,7 +591,13 @@ defmodule Livebook.Session.Data do
data_actions
|> reduce(Enum.reverse(section.cells), &delete_cell(&1, &2, section))
else
data_actions
if section.parent_id do
data_actions
|> unset_section_parent(section)
|> mark_section_and_dependent_cells_as_stale(section)
else
data_actions
end
end
data_actions
@ -590,20 +665,60 @@ defmodule Livebook.Session.Data do
end
defp update_cells_status_after_moved({data, _} = data_actions, prev_notebook) do
cells_with_section_before = Notebook.elixir_cells_with_section(prev_notebook)
cells_with_section_after = Notebook.elixir_cells_with_section(data.notebook)
relevant_cell? = fn cell -> is_struct(cell, Cell.Elixir) or is_struct(cell, Cell.Input) end
graph_before = Notebook.cell_dependency_graph(prev_notebook, cell_filter: relevant_cell?)
graph_after = Notebook.cell_dependency_graph(data.notebook, cell_filter: relevant_cell?)
affected_cells_with_section =
cells_with_section_after
|> Enum.zip(cells_with_section_before)
|> Enum.drop_while(fn {{cell_before, _}, {cell_after, _}} ->
cell_before.id == cell_after.id
# For each path in the dependency graph, find the upmost cell
# which parent changed. From that point downwards all cells
# are invalidated. Then gather invalidated cells from all paths
# and mark as such.
invalidted_cell_ids =
graph_after
|> Graph.leaves()
|> Enum.reduce(MapSet.new(), fn cell_id, invalidated ->
invalidated_on_path(cell_id, graph_after, graph_before, [], [])
|> MapSet.new()
|> MapSet.union(invalidated)
end)
invalidated_cells_with_section =
data.notebook
|> Notebook.elixir_cells_with_section()
|> Enum.filter(fn {cell, _} ->
MapSet.member?(invalidted_cell_ids, cell.id)
end)
|> Enum.map(fn {new, _old} -> new end)
data_actions
|> mark_cells_as_stale(affected_cells_with_section)
|> unqueue_cells_evaluation(affected_cells_with_section)
|> mark_cells_as_stale(invalidated_cells_with_section)
|> unqueue_cells_evaluation(invalidated_cells_with_section)
end
# Traverses path buttom-up looking for the upmost edge with changed parent.
defp invalidated_on_path(child_id, graph_after, graph_before, visited, invalidated)
defp invalidated_on_path(nil, _graph_after, _graph_before, _visited, invalidated),
do: invalidated
defp invalidated_on_path(child_id, graph_after, graph_before, visited, invalidated) do
if graph_after[child_id] == graph_before[child_id] do
invalidated_on_path(
graph_after[child_id],
graph_after,
graph_before,
[child_id | visited],
invalidated
)
else
invalidated_on_path(
graph_after[child_id],
graph_after,
graph_before,
[child_id | visited],
[child_id | visited]
)
end
end
defp queue_cell_evaluation(data_actions, cell, section) do
@ -699,6 +814,20 @@ defmodule Livebook.Session.Data do
|> reduce(invalidated_cells, &set_cell_info!(&1, &2.id, validity_status: :stale))
end
defp mark_section_and_dependent_cells_as_stale(data_actions, section) do
section.cells
|> Enum.find(fn cell -> is_struct(cell, Cell.Elixir) end)
|> case do
nil ->
data_actions
cell ->
data_actions
|> mark_cells_as_stale([{cell, section}])
|> mark_dependent_cells_as_stale(cell)
end
end
defp maybe_start_runtime({data, _} = data_actions, prev_data) do
if data.runtime == nil and not any_cell_queued?(prev_data) and any_cell_queued?(data) do
add_action(data_actions, :start_runtime)
@ -711,45 +840,93 @@ defmodule Livebook.Session.Data do
Enum.any?(data.section_infos, fn {_section_id, info} -> info.evaluation_queue != [] end)
end
# Don't tigger evaluation if we don't have a runtime started yet
defp maybe_evaluate_queued({%{runtime: nil}, _} = data_actions), do: data_actions
defp maybe_evaluate_queued({data, _} = data_actions) do
ongoing_evaluation? =
Enum.any?(data.notebook.sections, fn section ->
data.section_infos[section.id].evaluating_cell_id != nil
end)
main_flow_evaluating? = main_flow_evaluating?(data)
if ongoing_evaluation? or data.runtime == nil do
# Don't tigger evaluation if there is one already,
# or if we simply don't have a runtime started yet
data_actions
else
Enum.find_value(data.notebook.sections, data_actions, fn section ->
case data.section_infos[section.id] do
%{evaluating_cell_id: nil, evaluation_queue: [id | ids]} ->
# The section is idle and has cells queued for evaluation, so let's start the evaluation
cell = Enum.find(section.cells, &(&1.id == id))
{awaiting_branch_sections, awaiting_regular_sections} =
data.notebook.sections
|> Enum.filter(&section_awaits_evaluation?(data, &1.id))
|> Enum.split_with(& &1.parent_id)
data_actions
|> set!(notebook: Notebook.update_cell(data.notebook, id, &%{&1 | outputs: []}))
|> update_cell_info!(id, fn info ->
%{
info
| evaluation_status: :evaluating,
evaluation_digest: nil,
# During evaluation notebook changes may invalidate the cell,
# so we mark it as up-to-date straight away and possibly mark
# it as stale during evaluation
validity_status: :evaluated,
bound_to_input_ids: MapSet.new()
}
end)
|> set_section_info!(section.id, evaluating_cell_id: id, evaluation_queue: ids)
|> add_action({:start_evaluation, cell, section})
data_actions =
reduce(data_actions, awaiting_branch_sections, fn {data, _} = data_actions, section ->
%{evaluation_queue: [id | _]} = data.section_infos[section.id]
_ ->
# The section is neither evaluating nor queued, so let's check the next section
nil
{:ok, parent} = Notebook.fetch_section(data.notebook, section.parent_id)
prev_cell_section =
data.notebook
|> Notebook.parent_cells_with_section(id)
|> Enum.find_value(parent, fn {cell, section} ->
is_struct(cell, Cell.Elixir) && section
end)
prev_section_queued? =
prev_cell_section != nil and
data.section_infos[prev_cell_section.id].evaluation_queue != []
# If evaluating this cell requires interaction with the main flow,
# we keep the cell queued. In case of the Elixir runtimes the
# evaluation context needs to be copied between evaluation processes
# and this requires the main flow to be free of work.
if prev_cell_section != section and (main_flow_evaluating? or prev_section_queued?) do
data_actions
else
evaluate_next_cell_in_section(data_actions, section)
end
end)
if awaiting_regular_sections != [] and not main_flow_evaluating? do
section = hd(awaiting_regular_sections)
evaluate_next_cell_in_section(data_actions, section)
else
data_actions
end
end
defp main_flow_evaluating?(data) do
Enum.any?(data.notebook.sections, fn section ->
section.parent_id == nil and section_evaluating?(data, section.id)
end)
end
defp section_evaluating?(data, section_id) do
info = data.section_infos[section_id]
info.evaluating_cell_id != nil
end
defp section_awaits_evaluation?(data, section_id) do
info = data.section_infos[section_id]
info.evaluating_cell_id == nil and info.evaluation_queue != []
end
defp evaluate_next_cell_in_section({data, _} = data_actions, section) do
case data.section_infos[section.id] do
%{evaluating_cell_id: nil, evaluation_queue: [id | ids]} ->
cell = Enum.find(section.cells, &(&1.id == id))
data_actions
|> set!(notebook: Notebook.update_cell(data.notebook, id, &%{&1 | outputs: []}))
|> update_cell_info!(id, fn info ->
%{
info
| evaluation_status: :evaluating,
evaluation_digest: nil,
# During evaluation notebook changes may invalidate the cell,
# so we mark it as up-to-date straight away and possibly mark
# it as stale during evaluation
validity_status: :evaluated,
bound_to_input_ids: MapSet.new()
}
end)
|> set_section_info!(section.id, evaluating_cell_id: id, evaluation_queue: ids)
|> add_action({:start_evaluation, cell, section})
_ ->
data_actions
end
end
@ -760,11 +937,18 @@ defmodule Livebook.Session.Data do
end)
end
defp clear_evaluation({data, _} = data_actions) do
defp clear_all_evaluation({data, _} = data_actions) do
data_actions
|> reduce(data.notebook.sections, &clear_section_evaluation/2)
end
defp clear_main_evaluation({data, _} = data_actions) do
regular_sections = Enum.filter(data.notebook.sections, &(&1.parent_id == nil))
data_actions
|> reduce(regular_sections, &clear_section_evaluation/2)
end
defp clear_section_evaluation(data_actions, section) do
data_actions
|> set_section_info!(section.id, evaluating_cell_id: nil, evaluation_queue: [])
@ -807,14 +991,19 @@ defmodule Livebook.Session.Data do
case data.cell_infos[cell.id].evaluation_status do
:evaluating ->
data_actions
|> clear_evaluation()
|> then(fn data_actions ->
if section.parent_id do
clear_section_evaluation(data_actions, section)
else
clear_main_evaluation(data_actions)
end
end)
|> add_action({:stop_evaluation, section})
:queued ->
data_actions
|> unqueue_cell_evaluation(cell, section)
|> unqueue_dependent_cells_evaluation(cell)
|> mark_dependent_cells_as_stale(cell)
_ ->
data_actions
@ -838,6 +1027,17 @@ defmodule Livebook.Session.Data do
end)
end
defp cancel_section_evaluation({data, _} = data_actions, section) do
case data.section_infos[section.id] do
%{evaluating_cell_id: nil} ->
data_actions
%{evaluating_cell_id: evaluating_cell_id} ->
cell = Enum.find(section.cells, &(&1.id == evaluating_cell_id))
cancel_cell_evaluation(data_actions, cell, section)
end
end
defp set_notebook_name({data, _} = data_actions, name) do
data_actions
|> set!(notebook: %{data.notebook | name: name})
@ -970,7 +1170,7 @@ defmodule Livebook.Session.Data do
if prev_data.runtime == nil and data.runtime != nil do
maybe_evaluate_queued(data_actions)
else
clear_evaluation(data_actions)
clear_all_evaluation(data_actions)
end
end
@ -1085,15 +1285,6 @@ defmodule Livebook.Session.Data do
set!(data_actions, dirty: dirty)
end
@doc """
Finds the cell that's currently being evaluated in the given section.
"""
@spec get_evaluating_cell_id(t(), Section.id()) :: Cell.id() | nil
def get_evaluating_cell_id(data, section_id) do
info = data.section_infos[section_id]
info && info.evaluating_cell_id
end
@doc """
Find child cells bound to the given input cell.
"""

View file

@ -0,0 +1,42 @@
defmodule Livebook.Utils.Graph do
@moduledoc false
@typedoc """
A bottom-up graph representation encoded as a map
of child-to-parent entries.
"""
@type t() :: %{node_id => node_id | nil}
@type t(node_id) :: %{node_id => node_id | nil}
@type node_id :: term()
@doc """
Finds a path between nodes `from_id` and `to_id`.
If the path exists, a top-down list of nodes is
returned including the extreme nodes. Otherwise,
an empty list is returned.
"""
@spec find_path(t(), node_id(), node_id()) :: list(node_id())
def find_path(graph, from_id, to_id) do
find_path(graph, from_id, to_id, [])
end
defp find_path(_graph, to_id, to_id, path), do: [to_id | path]
defp find_path(_graph, nil, _to_id, _path), do: []
defp find_path(graph, from_id, to_id, path),
do: find_path(graph, graph[from_id], to_id, [from_id | path])
@doc """
Finds grpah leave nodes, that is, nodes with
no children.
"""
@spec leaves(t()) :: list(node_id())
def leaves(graph) do
children = MapSet.new(graph, fn {key, _} -> key end)
parents = MapSet.new(graph, fn {_, value} -> value end)
MapSet.difference(children, parents) |> MapSet.to_list()
end
end

View file

@ -104,7 +104,7 @@ defmodule LivebookWeb.SessionLive do
current_user={@current_user}
path={Routes.session_path(@socket, :user, @session_id)} />
</SidebarHelpers.sidebar>
<div class="flex flex-col h-full w-full max-w-xs absolute z-30 top-0 left-[64px] shadow-xl md:static md:shadow-none overflow-y-auto bg-gray-50 border-r border-gray-100 px-6 py-10"
<div class="flex flex-col h-full w-full max-w-xs absolute z-30 top-0 left-[64px] shadow-xl md:static md:shadow-none bg-gray-50 border-r border-gray-100 px-6 py-10"
data-element="side-panel">
<div data-element="sections-list">
<div class="flex-grow flex flex-col">
@ -113,10 +113,15 @@ defmodule LivebookWeb.SessionLive do
</h3>
<div class="mt-4 flex flex-col space-y-4">
<%= for section_item <- @data_view.sections_items do %>
<button class="text-left hover:text-gray-900 text-gray-500"
<button class="text-left hover:text-gray-900 text-gray-500 flex items-center space-x-1"
data-element="sections-list-item"
data-section-id={section_item.id}>
<%= section_item.name %>
<span><%= section_item.name %></span>
<%= if section_item.parent do %>
<span class="tooltip right" aria-label={"Branches from\n#{section_item.parent.name}"}>
<.remix_icon icon="git-branch-line" class="text-lg font-normal flip-horizontally leading-none" />
</span>
<% end %>
</button>
<% end %>
</div>
@ -398,6 +403,22 @@ defmodule LivebookWeb.SessionLive do
{:noreply, socket}
end
def handle_event(
"set_section_parent",
%{"section_id" => section_id, "parent_id" => parent_id},
socket
) do
Session.set_section_parent(socket.assigns.session_id, section_id, parent_id)
{:noreply, socket}
end
def handle_event("unset_section_parent", %{"section_id" => section_id}, socket) do
Session.unset_section_parent(socket.assigns.session_id, section_id)
{:noreply, socket}
end
def handle_event(
"insert_cell",
%{"section_id" => section_id, "index" => index, "type" => type},
@ -595,17 +616,13 @@ defmodule LivebookWeb.SessionLive do
def handle_event("completion_request", %{"hint" => hint, "cell_id" => cell_id}, socket) do
data = socket.private.data
with {:ok, cell, _section} <- Notebook.fetch_cell_and_section(data.notebook, cell_id) do
with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, cell_id) do
if data.runtime do
prev_ref =
data.notebook
|> Notebook.parent_cells_with_section(cell.id)
|> Enum.find_value(fn {cell, _} -> is_struct(cell, Cell.Elixir) && cell.id end)
completion_ref = make_ref()
prev_locator = Session.find_prev_locator(data.notebook, cell, section)
Runtime.request_completion_items(data.runtime, self(), completion_ref, hint, prev_locator)
ref = make_ref()
Runtime.request_completion_items(data.runtime, self(), ref, hint, :main, prev_ref)
{:reply, %{"completion_ref" => inspect(ref)}, socket}
{:reply, %{"completion_ref" => inspect(completion_ref)}, socket}
else
{:reply, %{"completion_ref" => nil},
put_flash(
@ -1009,7 +1026,11 @@ defmodule LivebookWeb.SessionLive do
notebook_name: data.notebook.name,
sections_items:
for section <- data.notebook.sections do
%{id: section.id, name: section.name}
%{
id: section.id,
name: section.name,
parent: parent_section_view(section.parent_id, data)
}
end,
clients:
data.clients_map
@ -1057,11 +1078,24 @@ defmodule LivebookWeb.SessionLive do
id: section.id,
html_id: html_id,
name: section.name,
parent: parent_section_view(section.parent_id, data),
has_children?: Notebook.child_sections(data.notebook, section.id) != [],
valid_parents:
for parent <- Notebook.valid_parents_for(data.notebook, section.id) do
%{id: parent.id, name: parent.name}
end,
cell_views: Enum.map(section.cells, &cell_to_view(&1, data))
}
end)
end
defp parent_section_view(nil, _data), do: nil
defp parent_section_view(parent_id, data) do
{:ok, section} = Notebook.fetch_section(data.notebook, parent_id)
%{id: section.id, name: section.name}
end
defp cell_to_view(%Cell.Elixir{} = cell, data) do
info = data.cell_infos[cell.id]

View file

@ -23,6 +23,35 @@ defmodule LivebookWeb.SessionLive.SectionComponent do
<.remix_icon icon="link" class="text-xl" />
</a>
</span>
<%= if @section_view.valid_parents != [] and not @section_view.has_children? do %>
<div class="relative" id={"section-#{@section_view.id}-branch-menu"} phx-hook="Menu" data-element="menu">
<span class="tooltip top" aria-label="Branch out from">
<button class="icon-button" data-toggle>
<.remix_icon icon="git-branch-line" class="text-xl flip-horizontally" />
</button>
</span>
<div class="menu" data-content>
<%= for parent <- @section_view.valid_parents do %>
<%= if @section_view.parent && @section_view.parent.id == parent.id do %>
<button class="menu__item text-gray-900"
phx-click="unset_section_parent"
phx-value-section_id={@section_view.id}>
<.remix_icon icon="arrow-right-s-line" />
<span class="font-medium"><%= parent.name %></span>
</button>
<% else %>
<button class="menu__item text-gray-500"
phx-click="set_section_parent"
phx-value-section_id={@section_view.id}
phx-value-parent_id={parent.id}>
<.remix_icon icon="arrow-right-s-line" />
<span class="font-medium"><%= parent.name %></span>
</button>
<% end %>
<% end %>
</div>
</div>
<% end %>
<span class="tooltip top" aria-label="Move up">
<button class="icon-button"
phx-click="move_section"
@ -39,14 +68,24 @@ defmodule LivebookWeb.SessionLive.SectionComponent do
<.remix_icon icon="arrow-down-s-line" class="text-xl" />
</button>
</span>
<span class="tooltip top" aria-label="Delete">
<%= live_patch to: Routes.session_path(@socket, :delete_section, @session_id, @section_view.id),
class: "icon-button" do %>
<.remix_icon icon="delete-bin-6-line" class="text-xl" />
<% end %>
</span>
<%= unless @section_view.has_children? do %>
<span class="tooltip top" aria-label="Delete">
<%= live_patch to: Routes.session_path(@socket, :delete_section, @session_id, @section_view.id),
class: "icon-button" do %>
<.remix_icon icon="delete-bin-6-line" class="text-xl" />
<% end %>
</span>
<% end %>
</div>
</div>
<%= if @section_view.parent do %>
<h3 class="mt-1 flex items-end space-x-1 text-sm font-semibold text-gray-800">
<span class="tooltip bottom" aria-label={"This section branches out from the main flow\nand can be evaluated in parallel"}>
<.remix_icon icon="git-branch-line" class="text-lg font-normal flip-horizontally leading-none" />
</span>
<span class="leading-none">from <%= @section_view.parent.name %></span>
</h3>
<% end %>
<div class="container">
<div class="flex flex-col space-y-1">
<%= for {cell_view, index} <- Enum.with_index(@section_view.cell_views) do %>

View file

@ -4,7 +4,7 @@ defmodule Livebook.EvaluatorTest do
alias Livebook.Evaluator
setup do
{:ok, evaluator} = Evaluator.start_link()
evaluator = start_supervised!(Evaluator)
%{evaluator: evaluator}
end
@ -77,9 +77,8 @@ defmodule Livebook.EvaluatorTest do
[{List, :first, _arity, _location}]}, %{evaluation_time_ms: _time_ms}}
end
test "in case of an error returns only the relevant part of stacktrace", %{
evaluator: evaluator
} do
test "in case of an error returns only the relevant part of stacktrace",
%{evaluator: evaluator} do
code = """
defmodule Livebook.EvaluatorTest.Stacktrace.Math do
def bad_math do
@ -244,6 +243,35 @@ defmodule Livebook.EvaluatorTest do
end
end
describe "initialize_from/3" do
setup do
parent_evaluator = start_supervised!(Evaluator, id: :parent_evaluator)
%{parent_evaluator: parent_evaluator}
end
test "copies the given context and sets as the initial one",
%{evaluator: evaluator, parent_evaluator: parent_evaluator} do
Evaluator.evaluate_code(parent_evaluator, self(), "x = 1", :code_1)
assert_receive {:evaluation_response, :code_1, _, %{evaluation_time_ms: _time_ms}}
Evaluator.initialize_from(evaluator, parent_evaluator, :code_1)
Evaluator.evaluate_code(evaluator, self(), "x", :code_2)
assert_receive {:evaluation_response, :code_2, {:ok, 1}, %{evaluation_time_ms: _time_ms}}
end
test "mirrors process dictionary of the given evaluator",
%{evaluator: evaluator, parent_evaluator: parent_evaluator} do
Evaluator.evaluate_code(parent_evaluator, self(), "Process.put(:data, 1)", :code_1)
assert_receive {:evaluation_response, :code_1, _, %{evaluation_time_ms: _time_ms}}
Evaluator.initialize_from(evaluator, parent_evaluator, :code_1)
Evaluator.evaluate_code(evaluator, self(), "Process.get(:data)", :code_2)
assert_receive {:evaluation_response, :code_2, {:ok, 1}, %{evaluation_time_ms: _time_ms}}
end
end
# Helpers
# Some of the code passed to Evaluator above is expected

View file

@ -44,7 +44,8 @@ defmodule Livebook.LiveMarkdown.ExportTest do
},
%{
Notebook.Section.new()
| name: "Section 2",
| id: "s2",
name: "Section 2",
metadata: %{},
cells: [
%{
@ -58,7 +59,7 @@ defmodule Livebook.LiveMarkdown.ExportTest do
Notebook.Cell.new(:elixir)
| metadata: %{},
source: """
IO.gets("length: ")
IO.gets("length: ")\
"""
},
%{
@ -69,6 +70,21 @@ defmodule Livebook.LiveMarkdown.ExportTest do
props: %{min: 50, max: 150, step: 2}
}
]
},
%{
Notebook.Section.new()
| name: "Section 3",
metadata: %{},
parent_id: "s2",
cells: [
%{
Notebook.Cell.new(:elixir)
| metadata: %{},
source: """
Process.info()\
"""
}
]
}
]
}
@ -107,6 +123,14 @@ defmodule Livebook.LiveMarkdown.ExportTest do
```
<!-- livebook:{"livebook_object":"cell_input","name":"length","props":{"max":150,"min":50,"step":2},"type":"range","value":"100"} -->
<!-- livebook:{"branch_parent_index":1} -->
## Section 3
```elixir
Process.info()
```
"""
document = Export.notebook_to_markdown(notebook)

View file

@ -40,6 +40,14 @@ defmodule Livebook.LiveMarkdown.ImportTest do
```
<!-- livebook:{"livebook_object":"cell_input","name":"length","props":{"max":150,"min":50,"step":2},"type":"range","value":"100"} -->
<!-- livebook:{"branch_parent_index":1} -->
## Section 3
```elixir
Process.info()
```
"""
{notebook, []} = Import.notebook_from_markdown(markdown)
@ -79,6 +87,7 @@ defmodule Livebook.LiveMarkdown.ImportTest do
]
},
%Notebook.Section{
id: section2_id,
name: "Section 2",
metadata: %{},
cells: [
@ -103,6 +112,19 @@ defmodule Livebook.LiveMarkdown.ImportTest do
props: %{min: 50, max: 150, step: 2}
}
]
},
%Notebook.Section{
name: "Section 3",
metadata: %{},
parent_id: section2_id,
cells: [
%Cell.Elixir{
metadata: %{},
source: """
Process.info()\
"""
}
]
}
]
} = notebook

View file

@ -64,6 +64,197 @@ defmodule Livebook.NotebookTest do
end
end
describe "cell_dependency_graph/1" do
test "computes a linear graph for regular sections" do
notebook = %{
Notebook.new()
| sections: [
%{
Section.new()
| id: "s1",
cells: [
%{Cell.new(:markdown) | id: "c1"},
%{Cell.new(:elixir) | id: "c2"}
]
},
%{
Section.new()
| id: "s2",
cells: [
%{Cell.new(:input) | id: "c3"},
%{Cell.new(:elixir) | id: "c4"}
]
}
]
}
assert Notebook.cell_dependency_graph(notebook) == %{
"c4" => "c3",
"c3" => "c2",
"c2" => "c1",
"c1" => nil
}
end
test "ignores empty sections" do
notebook = %{
Notebook.new()
| sections: [
%{Section.new() | id: "s1", cells: [%{Cell.new(:elixir) | id: "c1"}]},
%{Section.new() | id: "s2", cells: []},
%{Section.new() | id: "s3", cells: [%{Cell.new(:elixir) | id: "c2"}]}
]
}
assert Notebook.cell_dependency_graph(notebook) == %{
"c2" => "c1",
"c1" => nil
}
end
test "computes a non-linear graph if there are branching sections" do
notebook = %{
Notebook.new()
| sections: [
%{
Section.new()
| id: "s1",
cells: [
%{Cell.new(:elixir) | id: "c1"},
%{Cell.new(:elixir) | id: "c2"}
]
},
%{
Section.new()
| id: "s2",
parent_id: "s1",
cells: [
%{Cell.new(:elixir) | id: "c3"},
%{Cell.new(:elixir) | id: "c4"}
]
},
%{
Section.new()
| id: "s3",
cells: [
%{Cell.new(:elixir) | id: "c5"},
%{Cell.new(:elixir) | id: "c6"}
]
}
]
}
assert Notebook.cell_dependency_graph(notebook) == %{
"c6" => "c5",
"c5" => "c2",
"c4" => "c3",
"c3" => "c2",
"c2" => "c1",
"c1" => nil
}
end
test "handles branching sections pointing to empty sections" do
notebook = %{
Notebook.new()
| sections: [
%{
Section.new()
| id: "s1",
cells: [
%{Cell.new(:elixir) | id: "c1"}
]
},
%{
Section.new()
| id: "s2",
cells: []
},
%{
Section.new()
| id: "s3",
parent_id: "s2",
cells: [
%{Cell.new(:elixir) | id: "c2"}
]
}
]
}
assert Notebook.cell_dependency_graph(notebook) == %{
"c2" => "c1",
"c1" => nil
}
end
test "handles branching sections placed further in the notebook" do
notebook = %{
Notebook.new()
| sections: [
%{
Section.new()
| id: "s1",
cells: [
%{Cell.new(:elixir) | id: "c1"}
]
},
%{
Section.new()
| id: "s2",
cells: [
%{Cell.new(:elixir) | id: "c2"}
]
},
%{
Section.new()
| id: "s3",
parent_id: "s1",
cells: [
%{Cell.new(:elixir) | id: "c3"}
]
},
%{
Section.new()
| id: "s4",
cells: [
%{Cell.new(:elixir) | id: "c4"}
]
}
]
}
assert Notebook.cell_dependency_graph(notebook) == %{
"c4" => "c2",
"c3" => "c1",
"c2" => "c1",
"c1" => nil
}
end
test "given :cell_filter option, includes only the matching cells" do
notebook = %{
Notebook.new()
| sections: [
%{
Section.new()
| id: "s1",
cells: [
%{Cell.new(:elixir) | id: "c1"},
%{Cell.new(:markdown) | id: "c2"},
%{Cell.new(:elixir) | id: "c3"}
]
}
]
}
assert Notebook.cell_dependency_graph(notebook, cell_filter: &is_struct(&1, Cell.Elixir)) ==
%{
"c3" => "c1",
"c1" => nil
}
end
end
describe "input_cell_for_prompt/3" do
test "returns an error if no input matches the given prompt" do
cell1 = Cell.new(:elixir)

View file

@ -35,36 +35,31 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServerTest do
end
end
describe "evaluate_code/6" do
describe "evaluate_code/5" do
test "spawns a new evaluator when necessary", %{pid: pid} do
RuntimeServer.evaluate_code(pid, "1 + 1", :container1, :evaluation1, nil)
RuntimeServer.evaluate_code(pid, "1 + 1", {:c1, :e1}, {:c1, nil})
assert_receive {:evaluation_response, :evaluation1, _, %{evaluation_time_ms: _time_ms}}
assert_receive {:evaluation_response, :e1, _, %{evaluation_time_ms: _time_ms}}
end
test "prevents from module redefinition warning being printed to standard error", %{pid: pid} do
stderr =
ExUnit.CaptureIO.capture_io(:stderr, fn ->
RuntimeServer.evaluate_code(pid, "defmodule Foo do end", :container1, :evaluation1, nil)
RuntimeServer.evaluate_code(pid, "defmodule Foo do end", :container1, :evaluation2, nil)
code = "defmodule Foo do end"
RuntimeServer.evaluate_code(pid, code, {:c1, :e1}, {:c1, nil})
RuntimeServer.evaluate_code(pid, code, {:c1, :e2}, {:c1, nil})
assert_receive {:evaluation_response, :evaluation1, _, %{evaluation_time_ms: _time_ms}}
assert_receive {:evaluation_response, :evaluation2, _, %{evaluation_time_ms: _time_ms}}
assert_receive {:evaluation_response, :e1, _, %{evaluation_time_ms: _time_ms}}
assert_receive {:evaluation_response, :e2, _, %{evaluation_time_ms: _time_ms}}
end)
assert stderr == ""
end
test "proxies evaluation stderr to evaluation stdout", %{pid: pid} do
RuntimeServer.evaluate_code(
pid,
~s{IO.puts(:stderr, "error")},
:container1,
:evaluation1,
nil
)
RuntimeServer.evaluate_code(pid, ~s{IO.puts(:stderr, "error")}, {:c1, :e1}, {:c1, nil})
assert_receive {:evaluation_output, :evaluation1, "error\n"}
assert_receive {:evaluation_output, :e1, "error\n"}
end
@tag capture_log: true
@ -74,24 +69,79 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServerTest do
Logger.error("hey")
"""
RuntimeServer.evaluate_code(pid, code, :container1, :evaluation1, nil)
RuntimeServer.evaluate_code(pid, code, {:c1, :e1}, {:c1, nil})
assert_receive {:evaluation_output, :evaluation1, log_message}
assert_receive {:evaluation_output, :e1, log_message}
assert log_message =~ "[error] hey"
end
test "supports cross-container evaluation context references", %{pid: pid} do
RuntimeServer.evaluate_code(pid, "x = 1", {:c1, :e1}, {:c1, nil})
assert_receive {:evaluation_response, :e1, _, %{evaluation_time_ms: _time_ms}}
RuntimeServer.evaluate_code(pid, "x", {:c2, :e2}, {:c1, :e1})
assert_receive {:evaluation_response, :e2, {:text, "\e[34m1\e[0m"},
%{evaluation_time_ms: _time_ms}}
end
test "evaluates code in different containers in parallel", %{pid: pid} do
# Start a process that waits for two joins and only then
# sends a response back to the callers and terminates
code = """
loop = fn loop, state ->
receive do
{:join, caller} ->
state = update_in(state.count, &(&1 + 1))
state = update_in(state.callers, &[caller | &1])
if state.count < 2 do
loop.(loop, state)
else
for caller <- state.callers do
send(caller, :join_ack)
end
end
end
end
pid = spawn(fn -> loop.(loop, %{callers: [], count: 0}) end)
"""
RuntimeServer.evaluate_code(pid, code, {:c1, :e1}, {:c1, nil})
assert_receive {:evaluation_response, :e1, _, %{evaluation_time_ms: _time_ms}}
await_code = """
send(pid, {:join, self()})
receive do
:join_ack -> :ok
end
"""
# Note: it's important to first start evaluation in :c2,
# because it needs to copy evaluation context from :c1
RuntimeServer.evaluate_code(pid, await_code, {:c2, :e2}, {:c1, :e1})
RuntimeServer.evaluate_code(pid, await_code, {:c1, :e3}, {:c1, :e1})
assert_receive {:evaluation_response, :e2, _, %{evaluation_time_ms: _time_ms}}
assert_receive {:evaluation_response, :e3, _, %{evaluation_time_ms: _time_ms}}
end
end
describe "request_completion_items/6" do
test "provides basic completion when no evaluation reference is given", %{pid: pid} do
RuntimeServer.request_completion_items(pid, self(), :comp_ref, "System.ver", nil, nil)
RuntimeServer.request_completion_items(pid, self(), :comp_ref, "System.ver", {:c1, nil})
assert_receive {:completion_response, :comp_ref, [%{label: "version/0"}]}
end
test "provides extended completion when previous evaluation reference is given", %{pid: pid} do
RuntimeServer.evaluate_code(pid, "number = 10", :c1, :e1, nil)
RuntimeServer.evaluate_code(pid, "number = 10", {:c1, :e1}, {:c1, nil})
assert_receive {:evaluation_response, :e1, _, %{evaluation_time_ms: _time_ms}}
RuntimeServer.request_completion_items(pid, self(), :comp_ref, "num", :c1, :e1)
RuntimeServer.request_completion_items(pid, self(), :comp_ref, "num", {:c1, :e1})
assert_receive {:completion_response, :comp_ref, [%{label: "number"}]}
end
@ -102,9 +152,9 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServerTest do
spawn_link(fn -> Process.exit(self(), :kill) end)
"""
RuntimeServer.evaluate_code(pid, code, :container1, :evaluation1, nil)
RuntimeServer.evaluate_code(pid, code, {:c1, :e1}, {:c1, nil})
assert_receive {:container_down, :container1, message}
assert_receive {:container_down, :c1, message}
assert message =~ "killed"
end
end

File diff suppressed because it is too large Load diff

View file

@ -11,10 +11,10 @@ defmodule Livebook.Runtime.NoopRuntime do
defimpl Livebook.Runtime do
def connect(_), do: :ok
def disconnect(_), do: :ok
def evaluate_code(_, _, _, _, _, _ \\ []), do: :ok
def forget_evaluation(_, _, _), do: :ok
def evaluate_code(_, _, _, _, _ \\ []), do: :ok
def forget_evaluation(_, _), do: :ok
def drop_container(_, _), do: :ok
def request_completion_items(_, _, _, _, _, _), do: :ok
def request_completion_items(_, _, _, _, _), do: :ok
def duplicate(_), do: {:ok, Livebook.Runtime.NoopRuntime.new()}
end
end