defmodule Hatatitla.Shared.Policy.SpaceMemberFilter do @moduledoc """ Filter check that allows space members to access records based on their space membership. Options: - `roles`: List of required roles (e.g., [:admin, :maintainer]). If not provided, any membership allows access. - `path`: Relationship path to reach space id associated with the resource. If not provided, assumes direct space_id field. Examples: # Direct space_id field authorize_if SpaceMemberFilter # User via space_membership authorize_if {SpaceMemberFilter path: [:space_membership, :space]} # QuizResult via user -> space_membership authorize_if {SpaceMemberFilter path: [:user, :space_membership, :space], roles: [:admin, :maintainer]}} # University via university_space join table authorize_if {SpaceMemberFilter path: [:university_space, :space], roles: [:admin]}} """ use Ash.Policy.FilterCheck @impl true def describe(opts) do path = get_path(opts) roles = get_roles(opts) role_desc = case roles do nil -> "" [] -> "" _ -> " (roles: #{inspect(roles)})" end "space member filter via #{inspect(path)}#{role_desc}" end @impl true def filter(actor, context, opts) do resource = get_resource(context) path = get_path(opts) roles = get_roles(opts) {last_relationship, to_many?} = relationship_info(resource, path) pkey = last_relationship.destination |> Ash.Resource.Info.primary_key() actor_valid_space_ids = get_actor_space_ids(actor, roles) if to_many? || opts[:ash_field_policy?] do expr( exists( ^path, ^Enum.map(pkey, &{&1, {:in, actor_valid_space_ids}}) ) ) else Enum.reduce(pkey, nil, fn pkey_field, expr -> if expr do expr(^expr and ^ref(path, pkey_field) in ^actor_valid_space_ids) else expr(^ref(path, pkey_field) in ^actor_valid_space_ids) end end) end end # Get actual space IDs as a list defp get_actor_space_ids(actor, roles) do case actor do %{id: actor_id} when is_binary(actor_id) -> base_query = Ash.Query.for_read(Hatatitla.Spaces.SpaceMembership, :read) |> Ash.Query.filter(user_id == ^actor_id) # Add role filter if specified final_query = case roles do nil -> base_query [] -> base_query roles -> Ash.Query.filter(base_query, role in ^roles) end # Execute query and extract space_ids final_query |> Ash.read!(authorize?: false) |> Enum.map(& &1.space_id) |> Enum.uniq() _ -> # No valid actor, return empty list [] end end defp get_path(opts), do: Keyword.get(opts, :path, [:space]) defp get_roles(opts), do: Keyword.get(opts, :roles, [:admin, :manager, :member]) defp get_resource(context), do: Map.fetch!(context, :resource) defp relationship_info(resource, path, to_many? \\ false) defp relationship_info(resource, [rel_key], to_many?) do rel = Ash.Resource.Info.relationship(resource, rel_key) raise_if_nil(rel, rel_key, resource) {rel, to_many? || rel.cardinality == :many} end defp relationship_info(resource, [rel_key | rest], to_many?) do rel = Ash.Resource.Info.relationship(resource, rel_key) raise_if_nil(rel, rel_key, resource) relationship_info(rel.destination, rest, to_many? || rel.cardinality == :many) end defp raise_if_nil(nil, rel_key, resource) do raise "No such relationship ':#{rel_key}' for #{resource}, required in `SpaceMemberFilter` check" end defp raise_if_nil(_, _, _) do :ok end end