Process WebSocket messages synchronously (#2371)

This commit is contained in:
Alexandre de Souza 2023-11-28 19:31:01 -03:00 committed by GitHub
parent 7f0c82b7bb
commit d7e12fc9f9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 60 additions and 40 deletions

View file

@ -88,6 +88,14 @@ defmodule Livebook.Hubs.TeamClient do
:exit, _ -> false
end
@doc """
Enqueues the synchronous event to be handled by the Team client.
"""
@spec handle_event(String.t(), {atom(), LivebookProto.event_proto()}) :: :ok
def handle_event(id, {topic, data}) do
GenServer.cast(registry_name(id), {:event, topic, data})
end
## GenServer callbacks
@impl true
@ -142,11 +150,13 @@ defmodule Livebook.Hubs.TeamClient do
@impl true
def handle_info(:connected, state) do
Hubs.Broadcasts.hub_connected(state.hub.id)
{:noreply, %{state | connected?: true, connection_error: nil}}
end
def handle_info({:connection_error, reason}, state) do
Hubs.Broadcasts.hub_connection_failed(state.hub.id, reason)
{:noreply, %{state | connected?: false, connection_error: reason}}
end
@ -163,6 +173,13 @@ defmodule Livebook.Hubs.TeamClient do
{:noreply, handle_event(topic, data, state)}
end
@impl true
def handle_cast({:event, topic, data}, state) do
Logger.debug("Received event #{topic} with data: #{inspect(data)}")
{:noreply, handle_event(topic, data, state)}
end
# Private
defp registry_name(id) do
@ -171,6 +188,7 @@ defmodule Livebook.Hubs.TeamClient do
defp put_secret(state, secret) do
state = remove_secret(state, secret)
%{state | secrets: [secret | state.secrets]}
end
@ -190,6 +208,7 @@ defmodule Livebook.Hubs.TeamClient do
defp put_file_system(state, file_system) do
state = remove_file_system(state, file_system)
%{state | file_systems: [file_system | state.file_systems]}
end

View file

@ -41,8 +41,7 @@ defmodule Livebook.Teams.Requests do
@spec org_sign(Team.t(), String.t()) ::
{:ok, map()} | {:error, map() | String.t()} | {:transport_error, String.t()}
def org_sign(team, payload) do
headers = auth_headers(team)
post("/api/v1/org/sign", %{payload: payload}, headers)
post("/api/v1/org/sign", %{payload: payload}, team)
end
@doc """
@ -54,10 +53,7 @@ defmodule Livebook.Teams.Requests do
secret_key = Teams.derive_key(team.teams_key)
secret_value = Teams.encrypt(secret.value, secret_key)
headers = auth_headers(team)
params = %{name: secret.name, value: secret_value}
post("/api/v1/org/secrets", params, headers)
post("/api/v1/org/secrets", %{name: secret.name, value: secret_value}, team)
end
@doc """
@ -69,10 +65,7 @@ defmodule Livebook.Teams.Requests do
secret_key = Teams.derive_key(team.teams_key)
secret_value = Teams.encrypt(secret.value, secret_key)
headers = auth_headers(team)
params = %{name: secret.name, value: secret_value}
put("/api/v1/org/secrets", params, headers)
put("/api/v1/org/secrets", %{name: secret.name, value: secret_value}, team)
end
@doc """
@ -81,10 +74,7 @@ defmodule Livebook.Teams.Requests do
@spec delete_secret(Team.t(), Secret.t()) ::
{:ok, String.t()} | {:error, map() | String.t()} | {:transport_error, String.t()}
def delete_secret(team, secret) do
headers = auth_headers(team)
params = %{name: secret.name}
delete("/api/v1/org/secrets", params, headers)
delete("/api/v1/org/secrets", %{name: secret.name}, team)
end
@doc """
@ -94,7 +84,6 @@ defmodule Livebook.Teams.Requests do
{:ok, map()} | {:error, map() | String.t()} | {:transport_error, String.t()}
def create_file_system(team, file_system) do
secret_key = Teams.derive_key(team.teams_key)
headers = auth_headers(team)
type = FileSystems.type(file_system)
%{name: name} = FileSystem.external_metadata(file_system)
@ -107,7 +96,7 @@ defmodule Livebook.Teams.Requests do
value: Teams.encrypt(json, secret_key)
}
post("/api/v1/org/file-systems", params, headers)
post("/api/v1/org/file-systems", params, team)
end
@doc """
@ -117,7 +106,6 @@ defmodule Livebook.Teams.Requests do
{:ok, map()} | {:error, map() | String.t()} | {:transport_error, String.t()}
def update_file_system(team, file_system) do
secret_key = Teams.derive_key(team.teams_key)
headers = auth_headers(team)
type = FileSystems.type(file_system)
%{name: name} = FileSystem.external_metadata(file_system)
@ -131,7 +119,7 @@ defmodule Livebook.Teams.Requests do
value: Teams.encrypt(json, secret_key)
}
put("/api/v1/org/file-systems", params, headers)
put("/api/v1/org/file-systems", params, team)
end
@doc """
@ -140,10 +128,7 @@ defmodule Livebook.Teams.Requests do
@spec delete_file_system(Team.t(), FileSystem.t()) ::
{:ok, String.t()} | {:error, map() | String.t()} | {:transport_error, String.t()}
def delete_file_system(team, file_system) do
headers = auth_headers(team)
params = %{id: file_system.external_id}
delete("/api/v1/org/file-systems", params, headers)
delete("/api/v1/org/file-systems", %{id: file_system.external_id}, team)
end
@doc """
@ -152,10 +137,8 @@ defmodule Livebook.Teams.Requests do
@spec create_deployment_group(Team.t(), DeploymentGroup.t()) ::
{:ok, map()} | {:error, map() | String.t()} | {:transport_error, String.t()}
def create_deployment_group(team, deployment_group) do
headers = auth_headers(team)
params = %{name: deployment_group.name, mode: deployment_group.mode}
post("/api/v1/org/deployment-groups", params, headers)
post("/api/v1/org/deployment-groups", params, team)
end
@doc """
@ -164,10 +147,8 @@ defmodule Livebook.Teams.Requests do
@spec update_deployment_group(Team.t(), DeploymentGroup.t()) ::
{:ok, map()} | {:error, map() | String.t()} | {:transport_error, String.t()}
def update_deployment_group(team, deployment_group) do
headers = auth_headers(team)
params = %{id: deployment_group.id, name: deployment_group.name, mode: deployment_group.mode}
put("/api/v1/org/deployment-groups", params, headers)
put("/api/v1/org/deployment-groups", params, team)
end
@doc """
@ -176,10 +157,7 @@ defmodule Livebook.Teams.Requests do
@spec delete_deployment_group(Team.t(), DeploymentGroup.t()) ::
{:ok, String.t()} | {:error, map() | String.t()} | {:transport_error, String.t()}
def delete_deployment_group(team, deployment_group) do
headers = auth_headers(team)
params = %{id: deployment_group.id}
delete("/api/v1/org/deployment-groups", params, headers)
delete("/api/v1/org/deployment-groups", %{id: deployment_group.id}, team)
end
@doc """
@ -210,26 +188,32 @@ defmodule Livebook.Teams.Requests do
]
end
defp post(path, json, headers \\ []) do
defp post(path, json, team \\ nil) do
body = {"application/json", Jason.encode!(json)}
request(:post, path, body: body, headers: headers)
request(:post, path, body: body, headers: auth_headers(team))
|> dispatch_messages(team)
end
defp put(path, json, headers) do
defp put(path, json, team) do
body = {"application/json", Jason.encode!(json)}
request(:put, path, body: body, headers: headers)
request(:put, path, body: body, headers: auth_headers(team))
|> dispatch_messages(team)
end
defp delete(path, json, headers) do
defp delete(path, json, team) do
body = {"application/json", Jason.encode!(json)}
request(:delete, path, body: body, headers: headers)
request(:delete, path, body: body, headers: auth_headers(team))
|> dispatch_messages(team)
end
defp get(path, params \\ %{}, headers \\ []) do
defp get(path, params \\ %{}) do
query_string = URI.encode_query(params)
path = if query_string != "", do: "#{path}?#{query_string}", else: path
request(:get, path, headers: headers)
request(:get, path, headers: [])
end
defp request(method, path, opts) do
@ -256,6 +240,23 @@ defmodule Livebook.Teams.Requests do
end
end
defp dispatch_messages({:ok, %{"messages" => _} = body}, team) do
{messages, body} = Map.pop!(body, "messages")
for message <- messages do
%{type: event} =
message
|> Base.url_decode64!(padding: false)
|> LivebookProto.Event.decode()
Livebook.Hubs.TeamClient.handle_event(team.id, event)
end
{:ok, body}
end
defp dispatch_messages(result, _), do: result
defp json?(headers) do
HTTP.fetch_content_type(headers) == {:ok, "application/json"}
end