mirror of
https://github.com/livebook-dev/livebook.git
synced 2025-03-09 21:37:42 +08:00
Migrate the Evaluator process from GenServer to a regular process (#502)
* Migrate the Evaluator process from GenServer to a regular process * Update CHANGELOG * Attach reference to every evaluator and pass in internal messages * Use start_supervised/2 in tests * Apply review comments
This commit is contained in:
parent
9822735bcf
commit
f91c71bf3a
5 changed files with 78 additions and 30 deletions
|
@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
|
|||
### Fixed
|
||||
|
||||
- Improved Markdown and math integration by migrating to remark ([#495](https://github.com/livebook-dev/livebook/pull/495))
|
||||
- Improved the evaluator process to not consume user-submitted messages from inbox ([#502](https://github.com/livebook-dev/livebook/pull/502))
|
||||
|
||||
## [v0.2.3](https://github.com/livebook-dev/livebook/tree/v0.2.3) (2021-08-12)
|
||||
|
||||
|
|
|
@ -10,14 +10,17 @@ defmodule Livebook.Evaluator do
|
|||
# It's important to store the binding in the same process
|
||||
# where the evaluation happens, as otherwise we would have to
|
||||
# send them between processes, effectively copying potentially large data.
|
||||
|
||||
use GenServer, restart: :temporary
|
||||
#
|
||||
# Note that this process is intentionally not a GenServer,
|
||||
# because we during evaluation we may receive arbitrary
|
||||
# messages and we don't want to consume them from the inbox,
|
||||
# as GenServer does.
|
||||
|
||||
require Logger
|
||||
|
||||
alias Livebook.Evaluator
|
||||
|
||||
@type t :: GenServer.server()
|
||||
@type t :: %{pid: pid(), ref: reference()}
|
||||
|
||||
@type state :: %{
|
||||
formatter: module(),
|
||||
|
@ -57,8 +60,12 @@ defmodule Livebook.Evaluator do
|
|||
* `formatter` - a module implementing the `Livebook.Evaluator.Formatter` behaviour,
|
||||
used for transforming evaluation response before it's sent to the client
|
||||
"""
|
||||
@spec start_link(keyword()) :: {:ok, pid(), t()} | {:error, term()}
|
||||
def start_link(opts \\ []) do
|
||||
GenServer.start_link(__MODULE__, opts)
|
||||
case :proc_lib.start_link(__MODULE__, :init, [opts]) do
|
||||
{:error, error} -> {:error, error}
|
||||
evaluator -> {:ok, evaluator.pid, evaluator}
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
@ -80,7 +87,7 @@ defmodule Livebook.Evaluator do
|
|||
"""
|
||||
@spec evaluate_code(t(), pid(), String.t(), ref(), ref() | nil, keyword()) :: :ok
|
||||
def evaluate_code(evaluator, send_to, code, ref, prev_ref \\ nil, opts \\ []) when ref != nil do
|
||||
GenServer.cast(evaluator, {:evaluate_code, send_to, code, ref, prev_ref, opts})
|
||||
cast(evaluator, {:evaluate_code, send_to, code, ref, prev_ref, opts})
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
@ -96,7 +103,8 @@ defmodule Livebook.Evaluator do
|
|||
{:ok, context()} | {:error, :not_modified}
|
||||
def fetch_evaluation_context(evaluator, ref, opts \\ []) do
|
||||
cached_id = opts[:cached_id]
|
||||
GenServer.call(evaluator, {:fetch_evaluation_context, ref, cached_id})
|
||||
|
||||
call(evaluator, {:fetch_evaluation_context, ref, cached_id})
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
@ -107,7 +115,7 @@ defmodule Livebook.Evaluator do
|
|||
"""
|
||||
@spec initialize_from(t(), t(), ref()) :: :ok
|
||||
def initialize_from(evaluator, source_evaluator, source_evaluation_ref) do
|
||||
GenServer.call(evaluator, {:initialize_from, source_evaluator, source_evaluation_ref})
|
||||
call(evaluator, {:initialize_from, source_evaluator, source_evaluation_ref})
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
@ -116,7 +124,7 @@ defmodule Livebook.Evaluator do
|
|||
"""
|
||||
@spec forget_evaluation(t(), ref()) :: :ok
|
||||
def forget_evaluation(evaluator, ref) do
|
||||
GenServer.cast(evaluator, {:forget_evaluation, ref})
|
||||
cast(evaluator, {:forget_evaluation, ref})
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
@ -134,12 +142,34 @@ defmodule Livebook.Evaluator do
|
|||
ref() | nil
|
||||
) :: :ok
|
||||
def handle_intellisense(evaluator, send_to, ref, request, evaluation_ref \\ nil) do
|
||||
GenServer.cast(evaluator, {:handle_intellisense, send_to, ref, request, evaluation_ref})
|
||||
cast(evaluator, {:handle_intellisense, send_to, ref, request, evaluation_ref})
|
||||
end
|
||||
|
||||
defp cast(evaluator, message) do
|
||||
send(evaluator.pid, {:cast, evaluator.ref, message})
|
||||
:ok
|
||||
end
|
||||
|
||||
defp call(evaluator, message) do
|
||||
call_ref = make_ref()
|
||||
send(evaluator.pid, {:call, evaluator.ref, self(), call_ref, message})
|
||||
|
||||
receive do
|
||||
{^call_ref, reply} -> reply
|
||||
end
|
||||
end
|
||||
|
||||
## Callbacks
|
||||
|
||||
@impl true
|
||||
def child_spec(opts) do
|
||||
%{
|
||||
id: __MODULE__,
|
||||
start: {__MODULE__, :start_link, [opts]},
|
||||
type: :worker,
|
||||
restart: :temporary
|
||||
}
|
||||
end
|
||||
|
||||
def init(opts) do
|
||||
formatter = Keyword.get(opts, :formatter, Evaluator.IdentityFormatter)
|
||||
|
||||
|
@ -148,11 +178,19 @@ defmodule Livebook.Evaluator do
|
|||
# Use the dedicated IO device as the group leader,
|
||||
# so that it handles all :stdio operations.
|
||||
Process.group_leader(self(), io_proxy)
|
||||
{:ok, initial_state(formatter, io_proxy)}
|
||||
|
||||
evaluator_ref = make_ref()
|
||||
state = initial_state(evaluator_ref, formatter, io_proxy)
|
||||
evaluator = %{pid: self(), ref: evaluator_ref}
|
||||
|
||||
:proc_lib.init_ack(evaluator)
|
||||
|
||||
loop(state)
|
||||
end
|
||||
|
||||
defp initial_state(formatter, io_proxy) do
|
||||
defp initial_state(evaluator_ref, formatter, io_proxy) do
|
||||
%{
|
||||
evaluator_ref: evaluator_ref,
|
||||
formatter: formatter,
|
||||
io_proxy: io_proxy,
|
||||
contexts: %{},
|
||||
|
@ -162,13 +200,25 @@ defmodule Livebook.Evaluator do
|
|||
}
|
||||
end
|
||||
|
||||
defp loop(%{evaluator_ref: evaluator_ref} = state) do
|
||||
receive do
|
||||
{:call, ^evaluator_ref, pid, ref, message} ->
|
||||
{:reply, reply, state} = handle_call(message, pid, state)
|
||||
send(pid, {ref, reply})
|
||||
loop(state)
|
||||
|
||||
{:cast, ^evaluator_ref, message} ->
|
||||
{:noreply, state} = handle_cast(message, state)
|
||||
loop(state)
|
||||
end
|
||||
end
|
||||
|
||||
defp initial_context() do
|
||||
env = :elixir.env_for_eval([])
|
||||
%{binding: [], env: env, id: random_id()}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:evaluate_code, send_to, code, ref, prev_ref, opts}, state) do
|
||||
defp handle_cast({:evaluate_code, send_to, code, ref, prev_ref, opts}, state) do
|
||||
Evaluator.IOProxy.configure(state.io_proxy, send_to, ref)
|
||||
|
||||
context = get_context(state, prev_ref)
|
||||
|
@ -205,7 +255,7 @@ defmodule Livebook.Evaluator do
|
|||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_cast({:forget_evaluation, ref}, state) do
|
||||
defp handle_cast({:forget_evaluation, ref}, state) do
|
||||
state =
|
||||
state
|
||||
|> Map.update!(:contexts, &Map.delete(&1, ref))
|
||||
|
@ -214,7 +264,7 @@ defmodule Livebook.Evaluator do
|
|||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_cast({:handle_intellisense, send_to, ref, request, evaluation_ref}, state) do
|
||||
defp handle_cast({:handle_intellisense, send_to, ref, request, evaluation_ref}, state) do
|
||||
context = get_context(state, evaluation_ref)
|
||||
|
||||
# Safely rescue from intellisense errors
|
||||
|
@ -230,8 +280,7 @@ defmodule Livebook.Evaluator do
|
|||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:fetch_evaluation_context, ref, cached_id}, _from, state) do
|
||||
defp handle_call({:fetch_evaluation_context, ref, cached_id}, _from, state) do
|
||||
context = get_context(state, ref)
|
||||
|
||||
reply =
|
||||
|
@ -244,7 +293,7 @@ defmodule Livebook.Evaluator do
|
|||
{:reply, reply, state}
|
||||
end
|
||||
|
||||
def handle_call({:initialize_from, source_evaluator, source_evaluation_ref}, _from, state) do
|
||||
defp handle_call({:initialize_from, source_evaluator, source_evaluation_ref}, _from, state) do
|
||||
state =
|
||||
case Evaluator.fetch_evaluation_context(
|
||||
source_evaluator,
|
||||
|
@ -312,8 +361,8 @@ defmodule Livebook.Evaluator do
|
|||
:crypto.strong_rand_bytes(20) |> Base.encode32(case: :lower)
|
||||
end
|
||||
|
||||
defp copy_process_dictionary_from(pid) do
|
||||
{:dictionary, dictionary} = Process.info(pid, :dictionary)
|
||||
defp copy_process_dictionary_from(source_evaluator) do
|
||||
{:dictionary, dictionary} = Process.info(source_evaluator.pid, :dictionary)
|
||||
|
||||
for {key, value} <- dictionary, not internal_dictionary_key?(key) do
|
||||
Process.put(key, value)
|
||||
|
|
|
@ -26,9 +26,7 @@ defmodule Livebook.Runtime.ErlDist.EvaluatorSupervisor do
|
|||
supervisor,
|
||||
{Evaluator, [formatter: Evaluator.DefaultFormatter]}
|
||||
) do
|
||||
{:ok, pid} -> {:ok, pid}
|
||||
{:ok, pid, _} -> {:ok, pid}
|
||||
:ignore -> {:error, :ignore}
|
||||
{:ok, _pid, evaluator} -> {:ok, evaluator}
|
||||
{:error, reason} -> {:error, reason}
|
||||
end
|
||||
end
|
||||
|
@ -38,7 +36,7 @@ defmodule Livebook.Runtime.ErlDist.EvaluatorSupervisor do
|
|||
"""
|
||||
@spec terminate_evaluator(pid(), Evaluator.t()) :: :ok
|
||||
def terminate_evaluator(supervisor, evaluator) do
|
||||
DynamicSupervisor.terminate_child(supervisor, evaluator)
|
||||
DynamicSupervisor.terminate_child(supervisor, evaluator.pid)
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
|
|
@ -142,8 +142,8 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
|
|||
|
||||
def handle_info({:DOWN, _, :process, pid, reason}, state) do
|
||||
state.evaluators
|
||||
|> Enum.find(fn {_container_ref, evaluator_pid} ->
|
||||
evaluator_pid == pid
|
||||
|> Enum.find(fn {_container_ref, evaluator} ->
|
||||
evaluator.pid == pid
|
||||
end)
|
||||
|> case do
|
||||
{container_ref, _} ->
|
||||
|
@ -235,7 +235,7 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
|
|||
state
|
||||
else
|
||||
{:ok, evaluator} = ErlDist.EvaluatorSupervisor.start_evaluator(state.evaluator_supervisor)
|
||||
Process.monitor(evaluator)
|
||||
Process.monitor(evaluator.pid)
|
||||
%{state | evaluators: Map.put(state.evaluators, container_ref, evaluator)}
|
||||
end
|
||||
end
|
||||
|
|
|
@ -4,7 +4,7 @@ defmodule Livebook.EvaluatorTest do
|
|||
alias Livebook.Evaluator
|
||||
|
||||
setup do
|
||||
evaluator = start_supervised!(Evaluator)
|
||||
{:ok, _pid, evaluator} = start_supervised(Evaluator)
|
||||
%{evaluator: evaluator}
|
||||
end
|
||||
|
||||
|
@ -253,7 +253,7 @@ defmodule Livebook.EvaluatorTest do
|
|||
|
||||
describe "initialize_from/3" do
|
||||
setup do
|
||||
parent_evaluator = start_supervised!(Evaluator, id: :parent_evaluator)
|
||||
{:ok, _pid, parent_evaluator} = start_supervised(Evaluator, id: :parent_evaluator)
|
||||
%{parent_evaluator: parent_evaluator}
|
||||
end
|
||||
|
||||
|
|
Loading…
Reference in a new issue