diff --git a/config/config.exs b/config/config.exs index 1b885c837..79abc90f4 100644 --- a/config/config.exs +++ b/config/config.exs @@ -31,7 +31,6 @@ config :livebook, learn_notebooks: [], plugs: [], shutdown_callback: nil, - storage: Livebook.Storage.Ets, update_instructions_url: nil, within_iframe: false, allowed_uri_schemes: [] diff --git a/lib/livebook/application.ex b/lib/livebook/application.ex index a7e3d4b6c..fae7c36a6 100644 --- a/lib/livebook/application.ex +++ b/lib/livebook/application.ex @@ -19,7 +19,7 @@ defmodule Livebook.Application do # Start a supervisor for Livebook tasks {Task.Supervisor, name: Livebook.TaskSupervisor}, # Start the storage module - Livebook.Storage.current(), + Livebook.Storage, # Start the periodic version check Livebook.UpdateCheck, # Periodic measurement of system resources diff --git a/lib/livebook/settings.ex b/lib/livebook/settings.ex index e375dc478..01b3b7d48 100644 --- a/lib/livebook/settings.ex +++ b/lib/livebook/settings.ex @@ -82,16 +82,12 @@ defmodule Livebook.Settings do @spec remove_file_system(file_system_id()) :: :ok def remove_file_system(file_system_id) do if default_file_system_id() == file_system_id do - storage().delete_key(:settings, "global", :default_file_system_id) + Livebook.Storage.delete_key(:settings, "global", :default_file_system_id) end Livebook.NotebookManager.remove_file_system(file_system_id) - storage().delete(:filesystem, file_system_id) - end - - defp storage() do - Livebook.Storage.current() + Livebook.Storage.delete(:filesystem, file_system_id) end defp storage_to_fs(%{type: "s3"} = config) do @@ -230,7 +226,7 @@ defmodule Livebook.Settings do """ @spec set_default_file_system(file_system_id()) :: :ok def set_default_file_system(file_system_id) do - storage().insert(:settings, "global", default_file_system_id: file_system_id) + Livebook.Storage.insert(:settings, "global", default_file_system_id: file_system_id) end @doc """ @@ -238,7 +234,7 @@ defmodule Livebook.Settings do """ @spec default_file_system() :: Filesystem.t() def default_file_system() do - case storage().fetch(:filesystem, default_file_system_id()) do + case Livebook.Storage.fetch(:filesystem, default_file_system_id()) do {:ok, file} -> storage_to_fs(file) :error -> Livebook.Config.local_file_system() end @@ -249,7 +245,7 @@ defmodule Livebook.Settings do """ @spec default_file_system_id() :: file_system_id() def default_file_system_id() do - case storage().fetch_key(:settings, "global", :default_file_system_id) do + case Livebook.Storage.fetch_key(:settings, "global", :default_file_system_id) do {:ok, default_file_system_id} -> default_file_system_id :error -> "local" end diff --git a/lib/livebook/storage.ex b/lib/livebook/storage.ex index e9ec97586..ae0645655 100644 --- a/lib/livebook/storage.ex +++ b/lib/livebook/storage.ex @@ -1,8 +1,18 @@ defmodule Livebook.Storage do - @moduledoc """ - Behaviour defining an interface for storing arbitrary data in - [Entity-Attribute-Value](https://en.wikipedia.org/wiki/Entity%E2%80%93attribute%E2%80%93value_model) fashion. - """ + @moduledoc false + + # Storage for arbitrary data in [Entity-Attribute-Value](https://en.wikipedia.org/wiki/Entity%E2%80%93attribute%E2%80%93value_model) + # fashion. + # + # The storage uses an ETS table and synchronizes all the changes to + # a file, so they are restored when the application is started again. + # + # `insert` and `delete` operations are supposed to be called using a GenServer + # while all the lookups can be performed by directly accessing the named table. + + use GenServer + + require Logger @type namespace :: atom() @type entity_id :: binary() @@ -22,38 +32,69 @@ defmodule Livebook.Storage do end @doc """ - Returns all values in namespace. - - all(:filesystem) - [%{id: "rand-id", type: "s3", bucket_url: "/...", secret: "abc", access_key: "xyz"}] - + Starts the storage process. """ - @callback all(namespace()) :: [entity()] + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end @doc """ - Delegate for `c:all/1`. + Returns all values in namespace. + + ## Examples + + Livebook.Storage.all(:filesystem) + #=> [%{id: "rand-id", type: "s3", bucket_url: "/...", secret: "abc", access_key: "xyz"}] + """ - def all(namespace), do: current().all(namespace) + @spec all(namespace()) :: [entity()] + def all(namespace) do + table_name() + |> :ets.match({{namespace, :"$1"}, :"$2", :"$3", :_}) + |> Enum.group_by( + fn [entity_id, _attr, _val] -> entity_id end, + fn [_id, attr, val] -> {attr, val} end + ) + |> Enum.map(fn {entity_id, attributes} -> + attributes + |> Map.new() + |> Map.put(:id, entity_id) + end) + end @doc """ Returns a map identified by `entity_id` in `namespace`. - fetch(:filesystem, "rand-id") + ## Examples + + Livebook.Storage.fetch(:filesystem, "rand-id") #=> {:ok, %{id: "rand-id", type: "s3", bucket_url: "/...", secret: "abc", access_key: "xyz"}} """ - @callback fetch(namespace(), entity_id()) :: {:ok, entity()} | :error + @spec fetch(namespace(), entity_id()) :: {:ok, entity()} | :error + def fetch(namespace, entity_id) do + table_name() + |> :ets.lookup({namespace, entity_id}) + |> case do + [] -> + :error - @doc """ - Delegate for `c:fetch/2`. - """ - def fetch(namespace, id), do: current().fetch(namespace, id) + entries -> + entries + |> Enum.map(fn {_key, attr, val, _timestamp} -> {attr, val} end) + |> Map.new() + |> Map.put(:id, entity_id) + |> then(&{:ok, &1}) + end + end @doc """ Raising delegate for `c:fetch/2`. """ + @spec fetch!(namespace(), entity_id()) :: entity() def fetch!(namespace, id) do - case current().fetch(namespace, id) do + case fetch(namespace, id) do {:ok, entity} -> entity :error -> raise NotFoundError, namespace: namespace, id: id end @@ -62,47 +103,144 @@ defmodule Livebook.Storage do @doc """ Returns the value for a given `namespace`-`entity_id`-`attribute`. - fetch_key(:filesystem, "rand-id", :type) + ## Examples + + Livebook.Storage.fetch_key(:filesystem, "rand-id", :type) #=> {:ok, "s3"} """ - @callback fetch_key(namespace(), entity_id(), attribute()) :: {:ok, value()} | :error + @spec fetch_key(namespace(), entity_id(), attribute()) :: {:ok, value()} | :error + def fetch_key(namespace, entity_id, key) do + table_name() + |> :ets.match({{namespace, entity_id}, key, :"$1", :_}) + |> case do + [[value]] -> {:ok, value} + [] -> :error + end + end @doc """ - Delegate for `c:fetch_key/3`. + Inserts given list of attribute-value pairs to a entity belonging to + specified namespace. """ - def fetch_key(namespace, id, attribute), do: current().fetch_key(namespace, id, attribute) - - @doc """ - Inserts given list of attribute-value paris to a entity belonging to specified namespace. - """ - @callback insert(namespace(), entity_id(), [{attribute(), value()}]) :: :ok - - @doc """ - Delegate for `c:insert/3`. - """ - def insert(namespace, id, attributes), do: current().insert(namespace, id, attributes) + @spec insert(namespace(), entity_id(), [{attribute(), value()}]) :: :ok + def insert(namespace, entity_id, attributes) do + GenServer.call(__MODULE__, {:insert, namespace, entity_id, attributes}) + end @doc """ Deletes an entity of given id from given namespace. """ - @callback delete(namespace(), entity_id()) :: :ok - - @doc """ - Delegate for `c:delete/2`. - """ - def delete(namespace, id), do: current().delete(namespace, id) + @spec delete(namespace(), entity_id()) :: :ok + def delete(namespace, entity_id) do + GenServer.call(__MODULE__, {:delete, namespace, entity_id}) + end @doc """ Deletes an attribute from given entity. """ - @callback delete_key(namespace(), entity_id(), attribute()) :: :ok + @spec delete_key(namespace(), entity_id(), attribute()) :: :ok + def delete_key(namespace, entity_id, key) do + GenServer.call(__MODULE__, {:delete_key, namespace, entity_id, key}) + end @doc """ - Delegate for `c:delete_key/3`. + Returns file path where the data is persisted. """ - def delete_key(namespace, id, attribute), do: current().delete_key(namespace, id, attribute) + @spec config_file_path() :: Path.t() + def config_file_path() do + Path.join([Livebook.Config.data_path(), "livebook_config.ets"]) + end - @spec current() :: module() - def current(), do: Application.fetch_env!(:livebook, :storage) + @doc """ + Synchronously awaits for all prior changes to be processed. + """ + @spec sync() :: :ok + def sync() do + GenServer.call(__MODULE__, :sync) + end + + @impl true + def init(_opts) do + # Make sure that this process does not terminate abruptly + # in case it is persisting to disk. terminate/2 is still a no-op. + Process.flag(:trap_exit, true) + + table = load_or_create_table() + :persistent_term.put(__MODULE__, table) + + {:ok, %{table: table}} + end + + @impl true + def handle_call(:sync, _from, state) do + {:reply, :ok, state} + end + + @impl true + def handle_call({:insert, namespace, entity_id, attributes}, _from, %{table: table} = state) do + keys_to_delete = Enum.map(attributes, fn {key, _val} -> key end) + + delete_keys(table, namespace, entity_id, keys_to_delete) + + timestamp = System.os_time(:millisecond) + + attributes = + Enum.map(attributes, fn {attr, val} -> + {{namespace, entity_id}, attr, val, timestamp} + end) + + :ets.insert(table, attributes) + {:reply, :ok, state, {:continue, :save_to_file}} + end + + @impl true + def handle_call({:delete, namespace, entity_id}, _from, %{table: table} = state) do + :ets.delete(table, {namespace, entity_id}) + {:reply, :ok, state, {:continue, :save_to_file}} + end + + @impl true + def handle_call({:delete_key, namespace, entity_id, key}, _from, %{table: table} = state) do + delete_keys(table, namespace, entity_id, [key]) + {:reply, :ok, state, {:continue, :save_to_file}} + end + + @impl true + def handle_continue(:save_to_file, %{table: table} = state) do + file_path = String.to_charlist(config_file_path()) + :ok = :ets.tab2file(table, file_path) + {:noreply, state} + end + + defp table_name(), do: :persistent_term.get(__MODULE__) + + defp load_or_create_table() do + path = config_file_path() + + path + |> String.to_charlist() + |> :ets.file2tab() + |> case do + {:ok, tab} -> + Logger.info("Reading storage from #{path}") + tab + + {:error, reason} -> + case reason do + {:read_error, {:file_error, _, :enoent}} -> :ok + _ -> Logger.warning("Could not open up #{config_file_path()}: #{inspect(reason)}") + end + + :ets.new(__MODULE__, [:protected, :duplicate_bag]) + end + end + + defp delete_keys(table, namespace, entity_id, keys) do + match_head = {{namespace, entity_id}, :"$1", :_, :_} + + guards = Enum.map(keys, &{:==, :"$1", &1}) + + :ets.select_delete(table, [{match_head, guards, [true]}]) + end end diff --git a/lib/livebook/storage/ets.ex b/lib/livebook/storage/ets.ex deleted file mode 100644 index f40984f5e..000000000 --- a/lib/livebook/storage/ets.ex +++ /dev/null @@ -1,171 +0,0 @@ -defmodule Livebook.Storage.Ets do - @moduledoc """ - Ets implementation of `Livebook.Storage` behaviour. - - The module is supposed to be started just once as it - is responsible for managing a named ets table. - - `insert` and `delete` operations are supposed to be called using a GenServer - while all the lookups can be performed by directly accessing the named table. - """ - @behaviour Livebook.Storage - - require Logger - use GenServer - - @impl Livebook.Storage - def all(namespace) do - table_name() - |> :ets.match({{namespace, :"$1"}, :"$2", :"$3", :_}) - |> Enum.group_by( - fn [entity_id, _attr, _val] -> entity_id end, - fn [_id, attr, val] -> {attr, val} end - ) - |> Enum.map(fn {entity_id, attributes} -> - attributes - |> Map.new() - |> Map.put(:id, entity_id) - end) - end - - @impl Livebook.Storage - def fetch(namespace, entity_id) do - table_name() - |> :ets.lookup({namespace, entity_id}) - |> case do - [] -> - :error - - entries -> - entries - |> Enum.map(fn {_key, attr, val, _timestamp} -> {attr, val} end) - |> Map.new() - |> Map.put(:id, entity_id) - |> then(&{:ok, &1}) - end - end - - @impl Livebook.Storage - def fetch_key(namespace, entity_id, key) do - table_name() - |> :ets.match({{namespace, entity_id}, key, :"$1", :_}) - |> case do - [[value]] -> {:ok, value} - [] -> :error - end - end - - @spec config_file_path() :: Path.t() - def config_file_path() do - Path.join([Livebook.Config.data_path(), "livebook_config.ets"]) - end - - @spec sync() :: :ok - def sync() do - GenServer.call(__MODULE__, :sync) - end - - @impl Livebook.Storage - def insert(namespace, entity_id, attributes) do - GenServer.call(__MODULE__, {:insert, namespace, entity_id, attributes}) - end - - @impl Livebook.Storage - def delete(namespace, entity_id) do - GenServer.call(__MODULE__, {:delete, namespace, entity_id}) - end - - @impl Livebook.Storage - def delete_key(namespace, entity_id, key) do - GenServer.call(__MODULE__, {:delete_key, namespace, entity_id, key}) - end - - @spec start_link(keyword()) :: GenServer.on_start() - def start_link(opts) do - GenServer.start_link(__MODULE__, opts, name: __MODULE__) - end - - @impl GenServer - def init(_opts) do - # Make sure that this process does not terminate abruptly - # in case it is persisting to disk. terminate/2 is still a no-op. - Process.flag(:trap_exit, true) - - table = load_or_create_table() - :persistent_term.put(__MODULE__, table) - - {:ok, %{table: table}} - end - - @impl GenServer - def handle_call(:sync, _from, state) do - {:reply, :ok, state} - end - - @impl GenServer - def handle_call({:insert, namespace, entity_id, attributes}, _from, %{table: table} = state) do - keys_to_delete = Enum.map(attributes, fn {key, _val} -> key end) - - delete_keys(table, namespace, entity_id, keys_to_delete) - - timestamp = System.os_time(:millisecond) - - attributes = - Enum.map(attributes, fn {attr, val} -> - {{namespace, entity_id}, attr, val, timestamp} - end) - - :ets.insert(table, attributes) - {:reply, :ok, state, {:continue, :save_to_file}} - end - - @impl GenServer - def handle_call({:delete, namespace, entity_id}, _from, %{table: table} = state) do - :ets.delete(table, {namespace, entity_id}) - {:reply, :ok, state, {:continue, :save_to_file}} - end - - @impl GenServer - def handle_call({:delete_key, namespace, entity_id, key}, _from, %{table: table} = state) do - delete_keys(table, namespace, entity_id, [key]) - {:reply, :ok, state, {:continue, :save_to_file}} - end - - @impl GenServer - def handle_continue(:save_to_file, %{table: table} = state) do - file_path = String.to_charlist(config_file_path()) - :ok = :ets.tab2file(table, file_path) - {:noreply, state} - end - - defp table_name(), do: :persistent_term.get(__MODULE__) - - defp load_or_create_table() do - path = config_file_path() - - path - |> String.to_charlist() - |> :ets.file2tab() - |> case do - {:ok, tab} -> - Logger.info("Reading storage from #{path}") - tab - - {:error, reason} -> - case reason do - {:read_error, {:file_error, _, :enoent}} -> :ok - _ -> Logger.warning("Could not open up #{config_file_path()}: #{inspect(reason)}") - end - - :ets.new(__MODULE__, [:protected, :duplicate_bag]) - end - end - - defp delete_keys(table, namespace, entity_id, keys) do - match_head = {{namespace, entity_id}, :"$1", :_, :_} - - guards = Enum.map(keys, &{:==, :"$1", &1}) - - :ets.select_delete(table, [{match_head, guards, [true]}]) - end -end diff --git a/test/livebook/storage/ets_test.exs b/test/livebook/storage/ets_test.exs deleted file mode 100644 index 9082c4d85..000000000 --- a/test/livebook/storage/ets_test.exs +++ /dev/null @@ -1,125 +0,0 @@ -defmodule Livebook.Storage.EtsTest do - use ExUnit.Case, async: true - - alias Livebook.Storage.Ets - - describe "insert/3 and fetch/2" do - test "properly inserts a new key-value attribute" do - assert :ok = Ets.insert(:insert, "test", key1: "val1", key2: "val2") - - assert {:ok, - %{ - id: "test", - key1: "val1", - key2: "val2" - }} = Ets.fetch(:insert, "test") - end - - test "replaces already existing attributes with new values" do - assert :ok = Ets.insert(:insert, "replace", key1: "val1", key2: "val2") - - assert {:ok, - %{ - key1: "val1", - key2: "val2" - }} = Ets.fetch(:insert, "replace") - - assert :ok = - Ets.insert(:insert, "replace", key1: "updated_val1", key2: "val2", key3: "val3") - - assert {:ok, - %{ - key1: "updated_val1", - key2: "val2", - key3: "val3" - }} = Ets.fetch(:insert, "replace") - end - end - - describe "fetch_key/3" do - test "reads a given key" do - :ok = Ets.insert(:fetch_key, "test", key1: "val1") - assert Ets.fetch_key(:fetch_key, "test", :key1) == {:ok, "val1"} - assert Ets.fetch_key(:fetch_key, "test", :key2) == :error - end - - test "handles nil accordingly" do - assert Ets.fetch_key(:fetch_key, "test_nil", :key1) == :error - :ok = Ets.insert(:fetch_key, "test_nil", key1: nil) - assert Ets.fetch_key(:fetch_key, "test_nil", :key1) == {:ok, nil} - end - end - - test "fetch/2" do - :ok = Ets.insert(:fetch, "test", key1: "val1") - - assert {:ok, - %{ - id: "test", - key1: "val1" - }} = Ets.fetch(:fetch, "test") - - assert :error = Ets.fetch(:fetch, "unknown") - end - - test "delete/2" do - :ok = Ets.insert(:delete, "test", key1: "val1") - - assert {:ok, _entity} = Ets.fetch(:delete, "test") - - assert :ok = Ets.delete(:delete, "test") - - assert :error = Ets.fetch(:delete, "test") - end - - test "delete_key/3" do - :ok = Ets.insert(:delete_key, "test", key1: "val1", key2: "val2") - - assert :ok = Ets.delete_key(:delete_key, "test", :key2) - - assert {:ok, "val1"} = Ets.fetch_key(:delete_key, "test", :key1) - assert :error = Ets.fetch_key(:delete_key, "test", :key2) - end - - describe "all/1" do - test "returns all inserted entities for given namespace" do - :ok = Ets.insert(:all, "test1", key1: "val1") - :ok = Ets.insert(:all, "test2", key1: "val1") - - {:ok, entity1} = Ets.fetch(:all, "test1") - {:ok, entity2} = Ets.fetch(:all, "test2") - - assert [^entity1, ^entity2] = Enum.sort(Ets.all(:all)) - end - - test "returns an empty list if no entities exist for given namespace" do - assert [] = Ets.all(:unknown_namespace) - end - end - - describe "persistence" do - defp read_table_and_lookup(entity) do - :ok = Ets.sync() - - {:ok, tab} = - Ets.config_file_path() - |> String.to_charlist() - |> :ets.file2tab() - - :ets.lookup(tab, {:persistence, entity}) - end - - test "insert triggers saving to file" do - :ok = Ets.insert(:persistence, "insert", key: "val") - - assert [_test] = read_table_and_lookup("insert") - end - - test "delete triggers saving to file" do - :ok = Ets.insert(:persistence, "delete", key: "val") - :ok = Ets.delete(:persistence, "delete") - - assert [] = read_table_and_lookup("delete") - end - end -end diff --git a/test/livebook/storage_test.exs b/test/livebook/storage_test.exs new file mode 100644 index 000000000..899af70ec --- /dev/null +++ b/test/livebook/storage_test.exs @@ -0,0 +1,125 @@ +defmodule Livebook.StorageTest do + use ExUnit.Case, async: true + + alias Livebook.Storage + + describe "insert/3 and fetch/2" do + test "properly inserts a new key-value attribute" do + assert :ok = Storage.insert(:insert, "test", key1: "val1", key2: "val2") + + assert {:ok, + %{ + id: "test", + key1: "val1", + key2: "val2" + }} = Storage.fetch(:insert, "test") + end + + test "replaces already existing attributes with new values" do + assert :ok = Storage.insert(:insert, "replace", key1: "val1", key2: "val2") + + assert {:ok, + %{ + key1: "val1", + key2: "val2" + }} = Storage.fetch(:insert, "replace") + + assert :ok = + Storage.insert(:insert, "replace", key1: "updated_val1", key2: "val2", key3: "val3") + + assert {:ok, + %{ + key1: "updated_val1", + key2: "val2", + key3: "val3" + }} = Storage.fetch(:insert, "replace") + end + end + + describe "fetch_key/3" do + test "reads a given key" do + :ok = Storage.insert(:fetch_key, "test", key1: "val1") + assert Storage.fetch_key(:fetch_key, "test", :key1) == {:ok, "val1"} + assert Storage.fetch_key(:fetch_key, "test", :key2) == :error + end + + test "handles nil accordingly" do + assert Storage.fetch_key(:fetch_key, "test_nil", :key1) == :error + :ok = Storage.insert(:fetch_key, "test_nil", key1: nil) + assert Storage.fetch_key(:fetch_key, "test_nil", :key1) == {:ok, nil} + end + end + + test "fetch/2" do + :ok = Storage.insert(:fetch, "test", key1: "val1") + + assert {:ok, + %{ + id: "test", + key1: "val1" + }} = Storage.fetch(:fetch, "test") + + assert :error = Storage.fetch(:fetch, "unknown") + end + + test "delete/2" do + :ok = Storage.insert(:delete, "test", key1: "val1") + + assert {:ok, _entity} = Storage.fetch(:delete, "test") + + assert :ok = Storage.delete(:delete, "test") + + assert :error = Storage.fetch(:delete, "test") + end + + test "delete_key/3" do + :ok = Storage.insert(:delete_key, "test", key1: "val1", key2: "val2") + + assert :ok = Storage.delete_key(:delete_key, "test", :key2) + + assert {:ok, "val1"} = Storage.fetch_key(:delete_key, "test", :key1) + assert :error = Storage.fetch_key(:delete_key, "test", :key2) + end + + describe "all/1" do + test "returns all inserted entities for given namespace" do + :ok = Storage.insert(:all, "test1", key1: "val1") + :ok = Storage.insert(:all, "test2", key1: "val1") + + {:ok, entity1} = Storage.fetch(:all, "test1") + {:ok, entity2} = Storage.fetch(:all, "test2") + + assert [^entity1, ^entity2] = Enum.sort(Storage.all(:all)) + end + + test "returns an empty list if no entities exist for given namespace" do + assert [] = Storage.all(:unknown_namespace) + end + end + + describe "persistence" do + defp read_table_and_lookup(entity) do + :ok = Storage.sync() + + {:ok, tab} = + Storage.config_file_path() + |> String.to_charlist() + |> :ets.file2tab() + + :ets.lookup(tab, {:persistence, entity}) + end + + test "insert triggers saving to file" do + :ok = Storage.insert(:persistence, "insert", key: "val") + + assert [_test] = read_table_and_lookup("insert") + end + + test "delete triggers saving to file" do + :ok = Storage.insert(:persistence, "delete", key: "val") + :ok = Storage.delete(:persistence, "delete") + + assert [] = read_table_and_lookup("delete") + end + end +end