diff --git a/CHANGELOG.md b/CHANGELOG.md index d42993e8c..add321b4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ # Changelog ## 1.0.0 - * Introduce `:remove_link` action in pipelines and bins. + * Introduce `:remove_child_pad` action in pipelines and bins. * Add children groups - a mechanism that allows refering to multiple children with a single identifier. * Rename `remove_child` action into `remove_children` and allow for removing a children group with a single action. * Add an ability to spawn anonymous children. @@ -15,6 +15,8 @@ * The flow control of the pad is now set with a single `:flow_control` option instead of `:mode` and `:demand_mode` options. * Remove _t suffix from types [#509](https://github.com/membraneframework/membrane_core/pull/509) * Implement automatic demands in Membrane Sinks and Endpoints. [#512](https://github.com/membraneframework/membrane_core/pull/512) + * Add `handle_child_pad_removed/4` callback in Bins and Pipelines. [#513](https://github.com/membraneframework/membrane_core/pull/513) + * Introduce support for crash groups in Bins. [#521](https://github.com/membraneframework/membrane_core/pull/521) ## 0.11.0 * Separate element_name and pad arguments in handle_element_{start, end}_of_stream signature [#219](https://github.com/membraneframework/membrane_core/issues/219) diff --git a/lib/membrane/bin.ex b/lib/membrane/bin.ex index 9d9ac69c8..56812b3c8 100644 --- a/lib/membrane/bin.ex +++ b/lib/membrane/bin.ex @@ -90,6 +90,19 @@ defmodule Membrane.Bin do ) :: callback_return + @doc """ + Callback invoked when a child removes its pad. + + Removing child's pad due to return `t:Membrane.Bin.Action.remove_child_pad()` + from `Membrane.Bin` callbacks does not trigger this callback. + """ + @callback handle_child_pad_removed( + child :: Child.name(), + pad :: Pad.ref(), + context :: CallbackContext.t(), + state :: state + ) :: callback_return + @doc """ Callback invoked when a notification comes in from an element. """ @@ -160,6 +173,17 @@ defmodule Membrane.Bin do state :: state ) :: callback_return + @doc """ + Callback invoked when crash of the crash group happens. + + Context passed to this callback contains 2 additional fields: `:members` and `:crash_initiator`. + """ + @callback handle_crash_group_down( + group_name :: Child.group(), + context :: CallbackContext.t(), + state + ) :: callback_return + @doc """ A callback invoked when the bin is being removed by its parent. @@ -168,8 +192,7 @@ defmodule Membrane.Bin do @callback handle_terminate_request( context :: CallbackContext.t(), state - ) :: - callback_return() + ) :: callback_return @optional_callbacks handle_init: 2, handle_pad_added: 3, @@ -183,7 +206,9 @@ defmodule Membrane.Bin do handle_child_notification: 4, handle_parent_notification: 3, handle_tick: 3, - handle_terminate_request: 2 + handle_crash_group_down: 3, + handle_terminate_request: 2, + handle_child_pad_removed: 4 @doc PadsSpecs.def_pad_docs(:input, :bin) defmacro def_input_pad(name, spec) do @@ -323,6 +348,9 @@ defmodule Membrane.Bin do @impl true def handle_parent_notification(_notification, _ctx, state), do: {[], state} + @impl true + def handle_crash_group_down(_group_name, _ctx, state), do: {[], state} + @impl true def handle_terminate_request(_ctx, state), do: {[terminate: :normal], state} @@ -337,6 +365,7 @@ defmodule Membrane.Bin do handle_element_end_of_stream: 4, handle_child_notification: 4, handle_parent_notification: 3, + handle_crash_group_down: 3, handle_terminate_request: 2 end end diff --git a/lib/membrane/bin/action.ex b/lib/membrane/bin/action.ex index 9f2388207..36ab7636a 100644 --- a/lib/membrane/bin/action.ex +++ b/lib/membrane/bin/action.ex @@ -58,7 +58,7 @@ defmodule Membrane.Bin.Action do Removed link has to have dynamic pads on both ends. """ - @type remove_link :: {:remove_link, {Child.name(), Pad.ref()}} + @type remove_child_pad :: {:remove_child_pad, {Child.name(), Pad.ref()}} @typedoc """ Starts a timer that will invoke `c:Membrane.Bin.handle_tick/3` callback @@ -134,7 +134,7 @@ defmodule Membrane.Bin.Action do | notify_parent | spec | remove_children - | remove_link + | remove_child_pad | start_timer | timer_interval | stop_timer diff --git a/lib/membrane/bin/callback_context.ex b/lib/membrane/bin/callback_context.ex index 9bcfd819c..e5c61ecf0 100644 --- a/lib/membrane/bin/callback_context.ex +++ b/lib/membrane/bin/callback_context.ex @@ -8,6 +8,9 @@ defmodule Membrane.Bin.CallbackContext do Field `:pad_options` is present only in `c:Membrane.Bin.handle_pad_added/3` and `c:Membrane.Bin.handle_pad_removed/3`. + + Fields `:members` and `:crash_initiator` are present only in + `c:Membrane.Pipeline.handle_crash_group_down/3`. """ @type t :: %{ :clock => Membrane.Clock.t(), @@ -18,6 +21,8 @@ defmodule Membrane.Bin.CallbackContext do :playback => Membrane.Playback.t(), :resource_guard => Membrane.ResourceGuard.t(), :utility_supervisor => Membrane.UtilitySupervisor.t(), - optional(:pad_options) => map() + optional(:pad_options) => map(), + optional(:members) => [Membrane.Child.name()], + optional(:crash_initiator) => Membrane.Child.name() } end diff --git a/lib/membrane/children_spec.ex b/lib/membrane/children_spec.ex index 08826bb89..4d5ec6a71 100644 --- a/lib/membrane/children_spec.ex +++ b/lib/membrane/children_spec.ex @@ -225,7 +225,7 @@ defmodule Membrane.ChildrenSpec do #### Limitations At this moment crash groups are only useful for elements with dynamic pads. - Crash groups work only in pipelines and are not supported in bins. + Crash groups work in pipelines and bins as well. ### Log metadata `:log_metadata` field can be used to set the `Membrane.Logger` metadata for all children in the given children specification. @@ -237,9 +237,8 @@ defmodule Membrane.ChildrenSpec do ``` {[ child(:a, A) |> child(:b, B), - {child(:c, C), crash_group: - {:second, :temporary}} - ], crash_group_mode: :temporary, group: :first, node: some_node} + {child(:c, C), group: :second, crash_group_mode: :temporary} + ], group: :first, crash_group_mode: :temporary, node: some_node} ``` Child `:c` will be spawned in the `:second` crash group, while children `:a` and `:b` will be spawned in the `:first` crash group. @@ -541,7 +540,8 @@ defmodule Membrane.ChildrenSpec do target_queue_size: number | nil, min_demand_factor: number | nil, auto_demand_size: number | nil, - throttling_factor: number | nil + throttling_factor: number | nil, + implicit_unlink?: boolean() ) :: builder() | no_return def via_in(builder, pad, props \\ []) @@ -566,7 +566,8 @@ defmodule Membrane.ChildrenSpec do min_demand_factor: [default: nil], auto_demand_size: [default: nil], toilet_capacity: [default: nil], - throttling_factor: [default: 1] + throttling_factor: [default: 1], + implicit_unlink?: [default: true] ) |> case do {:ok, props} -> @@ -579,7 +580,7 @@ defmodule Membrane.ChildrenSpec do if builder.status == :from_pad do builder else - via_out(builder, :output) + builder |> via_out(:output) end |> then(&%Builder{&1 | status: :to_pad, to_pad: pad, to_pad_props: Enum.into(props, %{})}) end diff --git a/lib/membrane/core/bin.ex b/lib/membrane/core/bin.ex index e552daebf..e5950b72a 100644 --- a/lib/membrane/core/bin.ex +++ b/lib/membrane/core/bin.ex @@ -183,6 +183,11 @@ defmodule Membrane.Core.Bin do {:noreply, state} end + defp do_handle_info(Message.new(:child_pad_removed, [child, pad]), state) do + state = Parent.ChildLifeController.handle_child_pad_removed(child, pad, state) + {:noreply, state} + end + defp do_handle_info(Message.new(:child_notification, [from, notification]), state) do state = Parent.LifecycleController.handle_child_notification(from, notification, state) {:noreply, state} diff --git a/lib/membrane/core/bin/action_handler.ex b/lib/membrane/core/bin/action_handler.ex index a4888c529..1324ce3a5 100644 --- a/lib/membrane/core/bin/action_handler.ex +++ b/lib/membrane/core/bin/action_handler.ex @@ -44,8 +44,8 @@ defmodule Membrane.Core.Bin.ActionHandler do end @impl CallbackHandler - def handle_action({:remove_link, {child_name, pad_ref}}, _cb, _params, state) do - Parent.ChildLifeController.handle_remove_link(child_name, pad_ref, state) + def handle_action({:remove_child_pad, {child_name, pad_ref}}, _cb, _params, state) do + Parent.ChildLifeController.handle_remove_child_pad(child_name, pad_ref, state) end @impl CallbackHandler diff --git a/lib/membrane/core/bin/pad_controller.ex b/lib/membrane/core/bin/pad_controller.ex index 9571aafcd..c8bb7eaef 100644 --- a/lib/membrane/core/bin/pad_controller.ex +++ b/lib/membrane/core/bin/pad_controller.ex @@ -80,6 +80,22 @@ defmodule Membrane.Core.Bin.PadController do state end + @spec remove_pad!(Pad.ref(), State.t()) :: State.t() + def remove_pad!(pad_ref, state) do + cond do + state.terminating? -> + state + + Pad.is_dynamic_pad_ref(pad_ref) -> + Message.send(state.parent_pid, :child_pad_removed, [state.name, pad_ref]) + PadModel.delete_data!(state, pad_ref) + + Pad.is_static_pad_ref(pad_ref) -> + raise Membrane.PadError, + "Tried to unlink bin static pad #{inspect(pad_ref)}. Static pads cannot be unlinked unless bin is terminating" + end + end + @spec handle_linking_timeout(Pad.ref(), State.t()) :: :ok | no_return() def handle_linking_timeout(pad_ref, state) do case PadModel.get_data(state, pad_ref) do @@ -193,58 +209,61 @@ defmodule Membrane.Core.Bin.PadController do Core.Bin.State.t() ) :: {Core.Element.PadController.link_call_reply(), Core.Bin.State.t()} def handle_link(direction, endpoint, other_endpoint, params, state) do - pad_data = PadModel.get_data!(state, endpoint.pad_ref) - Membrane.Logger.debug("Handle link #{inspect(endpoint, pretty: true)}") - %{spec_ref: spec_ref, endpoint: child_endpoint, name: pad_name} = pad_data - - pad_props = - Map.merge(endpoint.pad_props, child_endpoint.pad_props, fn key, - external_value, - internal_value -> - if key in [ - :target_queue_size, - :min_demand_factor, - :auto_demand_size, - :toilet_capacity, - :throttling_factor - ] do - external_value || internal_value - else - internal_value - end - end) - - child_endpoint = %{child_endpoint | pad_props: pad_props} - - if params.initiator == :sibling do - :ok = - Child.PadController.validate_pad_mode!( - {endpoint.pad_ref, pad_data}, - {other_endpoint.pad_ref, params.other_info} - ) - end + with {:ok, pad_data} <- PadModel.get_data(state, endpoint.pad_ref) do + %{spec_ref: spec_ref, endpoint: child_endpoint, name: pad_name} = pad_data + + pad_props = + Map.merge(endpoint.pad_props, child_endpoint.pad_props, fn key, + external_value, + internal_value -> + if key in [ + :target_queue_size, + :min_demand_factor, + :auto_demand_size, + :toilet_capacity, + :throttling_factor + ] do + external_value || internal_value + else + internal_value + end + end) - params = - Map.update!( - params, - :stream_format_validation_params, - &[{state.module, pad_name} | &1] - ) + child_endpoint = %{child_endpoint | pad_props: pad_props} - reply = - Message.call!(child_endpoint.pid, :handle_link, [ - direction, - child_endpoint, - other_endpoint, - params - ]) - - state = PadModel.set_data!(state, endpoint.pad_ref, :linked?, true) - state = PadModel.set_data!(state, endpoint.pad_ref, :endpoint, child_endpoint) - state = ChildLifeController.proceed_spec_startup(spec_ref, state) - {reply, state} + if params.initiator == :sibling do + :ok = + Child.PadController.validate_pad_mode!( + {endpoint.pad_ref, pad_data}, + {other_endpoint.pad_ref, params.other_info} + ) + end + + params = + Map.update!( + params, + :stream_format_validation_params, + &[{state.module, pad_name} | &1] + ) + + reply = + Message.call!(child_endpoint.pid, :handle_link, [ + direction, + child_endpoint, + other_endpoint, + params + ]) + + state = PadModel.set_data!(state, endpoint.pad_ref, :linked?, true) + state = PadModel.set_data!(state, endpoint.pad_ref, :endpoint, child_endpoint) + state = ChildLifeController.proceed_spec_startup(spec_ref, state) + {reply, state} + else + {:error, :unknown_pad} -> + {{:error, {:unknown_pad, state.name, state.module, endpoint.pad_ref}}, state} + end end @doc """ @@ -258,6 +277,7 @@ defmodule Membrane.Core.Bin.PadController do {pad_data, state} = PadModel.pop_data!(state, pad_ref) if endpoint do + IO.inspect(endpoint, label: "ENDPOINT") Message.send(endpoint.pid, :handle_unlink, endpoint.pad_ref) ChildLifeController.proceed_spec_startup(pad_data.spec_ref, state) else diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index a1ac7e87e..9f7f30eab 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -44,7 +44,9 @@ defmodule Membrane.Core.Element.PadController do {Endpoint.t(), PadModel.pad_info(), %{toilet: Toilet.t() | nil}} @type link_call_reply :: - :ok | {:ok, link_call_reply_props} | {:error, {:neighbor_dead, reason :: any}} + :ok + | {:ok, link_call_reply_props} + | {:error, {:neighbor_dead, reason :: any} | :unknown_pad} @default_auto_demand_size_factor 4000 diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index 29941e00e..e71078717 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -3,7 +3,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do use Bunch alias __MODULE__.{CrashGroupUtils, LinkUtils, StartupUtils} - alias Membrane.ChildrenSpec + alias Membrane.{Child, ChildrenSpec} alias Membrane.Core.{Bin, CallbackHandler, Component, Parent, Pipeline} alias Membrane.Core.Parent.{ @@ -15,6 +15,8 @@ defmodule Membrane.Core.Parent.ChildLifeController do SpecificationParser } + alias Membrane.Core.Parent.Link.Endpoint + alias Membrane.Pad alias Membrane.ParentError @@ -31,25 +33,25 @@ defmodule Membrane.Core.Parent.ChildLifeController do | :linked_internally | :linking_externally | :ready, - children_names: [Membrane.Child.name()], + children_names: [Child.name()], links_ids: [Link.id()], awaiting_responses: MapSet.t({Link.id(), Membrane.Pad.direction()}), - dependent_specs: MapSet.t(spec_ref) + dependencies: %{spec_ref() => [Child.name()]} } @type pending_specs :: %{spec_ref() => pending_spec()} @opaque parsed_children_spec_options :: %{ - group: Membrane.Child.group(), + group: Child.group(), crash_group_mode: Membrane.CrashGroup.mode(), - stream_sync: :sinks | [[Membrane.Child.name()]], - clock_provider: Membrane.Child.name() | nil, + stream_sync: :sinks | [[Child.name()]], + clock_provider: Child.name() | nil, node: node() | nil, log_metadata: Keyword.t() } @type children_spec_canonical_form :: [ - {[Membrane.ChildrenSpec.builder()], parsed_children_spec_options()} + {[ChildrenSpec.builder()], parsed_children_spec_options()} ] @spec_dependency_requiring_statuses [:initializing, :linking_internally] @@ -97,7 +99,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do until all bin pads of the spec are linked. Linking bin pads is actually routing link calls to proper bin children. - Mark spec children as ready, optionally request to play or terminate - - Cleanup spec: remove it from `pending_specs` and all other specs' `dependent_specs` and try proceeding startup + - Cleanup spec: remove it from `pending_specs` and all other specs' `dependencies` and try proceeding startup for all other pending specs that depended on the spec. """ @spec handle_spec(ChildrenSpec.t(), Parent.state()) :: Parent.state() | no_return() @@ -144,20 +146,21 @@ defmodule Membrane.Core.Parent.ChildLifeController do resolved_links = LinkUtils.resolve_links(links, spec_ref, state) state = %{state | links: Map.merge(state.links, Map.new(resolved_links, &{&1.id, &1}))} - dependent_specs = + dependencies = resolved_links - |> Enum.flat_map(&[&1.from.child_spec_ref, &1.to.child_spec_ref]) - |> Enum.filter(fn spec_ref -> + |> Enum.flat_map(&[&1.from, &1.to]) + |> Enum.map(&{&1.child_spec_ref, &1.child}) + |> Enum.filter(fn {spec_ref, _child} -> get_in(state, [:pending_specs, spec_ref, :status]) in @spec_dependency_requiring_statuses end) - |> MapSet.new() + |> Enum.group_by(fn {spec_ref, _child} -> spec_ref end) state = put_in(state, [:pending_specs, spec_ref], %{ status: :initializing, children_names: all_children_names, links_ids: Enum.map(links, & &1.id), - dependent_specs: dependent_specs, + dependencies: dependencies, awaiting_responses: MapSet.new() }) @@ -196,7 +199,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do end end - @spec make_canonical(Membrane.ChildrenSpec.t(), parsed_children_spec_options()) :: + @spec make_canonical(ChildrenSpec.t(), parsed_children_spec_options()) :: children_spec_canonical_form() defp make_canonical(spec, defaults \\ @default_children_spec_options) @@ -261,7 +264,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do defp get_child_ref(child_name_or_ref, group) do case child_name_or_ref do # child name created with child(...) - {:child_name, child_name} -> Membrane.Child.ref(child_name, group: group) + {:child_name, child_name} -> Child.ref(child_name, group: group) # child name created with get_child(...), bin_input() and bin_output() {:child_ref, ref} -> ref end @@ -346,13 +349,13 @@ defmodule Membrane.Core.Parent.ChildLifeController do defp do_proceed_spec_startup(spec_ref, %{status: :initializing} = spec_data, state) do Membrane.Logger.debug( - "Proceeding spec #{inspect(spec_ref)} startup: initializing, dependent specs: #{inspect(spec_data.dependent_specs)}" + "Proceeding spec #{inspect(spec_ref)} startup: initializing, dependent specs: #{inspect(Map.keys(spec_data.dependencies))}" ) %{children: children} = state if Enum.all?(spec_data.children_names, &Map.fetch!(children, &1).initialized?) and - Enum.empty?(spec_data.dependent_specs) do + Enum.empty?(spec_data.dependencies) do Membrane.Logger.debug("Spec #{inspect(spec_ref)} status changed to initialized") do_proceed_spec_startup(spec_ref, %{spec_data | status: :initialized}, state) else @@ -456,11 +459,13 @@ defmodule Membrane.Core.Parent.ChildLifeController do case Map.fetch(state.links, link_id) do {:ok, %Link{spec_ref: spec_ref}} -> state = - update_in( - state, - [:pending_specs, spec_ref, :awaiting_responses], - &MapSet.delete(&1, {link_id, direction}) - ) + with %{pending_specs: %{^spec_ref => _spec_data}} <- state do + update_in( + state, + [:pending_specs, spec_ref, :awaiting_responses], + &MapSet.delete(&1, {link_id, direction}) + ) + end proceed_spec_startup(spec_ref, state) @@ -469,7 +474,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do end end - @spec handle_child_initialized(Membrane.Child.name(), Parent.state()) :: Parent.state() + @spec handle_child_initialized(Child.name(), Parent.state()) :: Parent.state() def handle_child_initialized(child, state) do %{spec_ref: spec_ref} = Parent.ChildrenModel.get_child_data!(state, child) state = put_in(state, [:children, child, :initialized?], true) @@ -477,7 +482,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do end @spec handle_notify_child( - {Membrane.Child.name(), Membrane.ParentNotification.t()}, + {Child.name(), Membrane.ParentNotification.t()}, Parent.state() ) :: :ok def handle_notify_child({child_name, message}, state) do @@ -487,10 +492,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do end @spec handle_remove_children( - Membrane.Child.ref() - | [Membrane.Child.ref()] - | Membrane.Child.group() - | [Membrane.Child.group()], + Child.ref() | [Child.ref()] | Child.group() | [Child.group()], Parent.state() ) :: Parent.state() def handle_remove_children(children_or_children_groups, state) do @@ -522,14 +524,178 @@ defmodule Membrane.Core.Parent.ChildLifeController do """) end - data |> Enum.filter(& &1.ready?) |> Enum.each(&Message.send(&1.pid, :terminate)) + Enum.each(data, &Message.send(&1.pid, :terminate)) + + children_names = Enum.map(data, & &1.name) + state = remove_children_from_specs(children_names, state) + Parent.ChildrenModel.update_children!(state, refs, &%{&1 | terminating?: true}) end - @spec handle_remove_link(Membrane.Child.name(), Pad.ref(), Parent.state()) :: + @spec handle_remove_child_pad(Child.name(), Pad.ref(), Parent.state()) :: Parent.state() - def handle_remove_link(child_name, pad_ref, state) do - LinkUtils.remove_link(child_name, pad_ref, state) + def handle_remove_child_pad(child, pad, state) do + Enum.find(state.links, fn {_id, link} -> + link_contanins_child_with_pad?(link, child, pad) + end) + |> case do + {_id, + %Link{ + from: %Endpoint{child: ^child, pad_ref: ^pad}, + to: %Endpoint{pad_props: %{implicit_unlink?: false}} + } = link} -> + # TODO: handle case when spec is not ready yet + LinkUtils.unlink_endpoint(link.from, state) + |> update_in([:links, link.id], &%{&1 | from: nil, cut?: true}) + + {_id, %Link{} = link} -> + LinkUtils.remove_link!(link.id, state) + + nil -> + if Map.has_key?(state.children, child) do + raise ParentError, """ + Attempted to unlink pad #{inspect(pad)} of child #{inspect(child)}, but this child does not have this pad linked + """ + end + + raise ParentError, """ + Attempted to unlink pad #{inspect(pad)} of child #{inspect(child)}, but such a child does not exist + """ + end + end + + defp link_contanins_child_with_pad?(link, child, pad) do + case link do + %Link{from: %Endpoint{child: ^child, pad_ref: ^pad}} -> true + %Link{to: %Endpoint{child: ^child, pad_ref: ^pad}} -> true + %Link{} -> false + end + end + + defp remove_children_from_specs(children, state) do + children = Bunch.listify(children) + children_set = MapSet.new(children) + + children_links_ids_set = + Map.values(state.links) + |> Enum.filter(&(&1.from.child in children_set or &1.to.child in children_set)) + |> MapSet.new(& &1.id) + + affected_specs = + state.pending_specs + |> Enum.filter(fn {_ref, spec_data} -> + Enum.any?(spec_data.children_names, &(&1 in children_set)) or + Enum.any?(spec_data.links_ids, &(&1 in children_links_ids_set)) + end) + + updated_specs = + affected_specs + |> Map.new(fn {spec_ref, spec_data} -> + children_names = + spec_data.children_names + |> Enum.reject(&(&1 in children_set)) + + links_ids = Enum.reject(spec_data.links_ids, &(&1 in children_links_ids_set)) + + awaiting_responses = + spec_data.awaiting_responses + |> Enum.reject(fn {link_id, _direction} -> link_id in children_links_ids_set end) + |> MapSet.new() + + dependencies = + spec_data.dependencies + |> update_spec_dependencies(children_set) + + spec_data = %{ + spec_data + | children_names: children_names, + links_ids: links_ids, + awaiting_responses: awaiting_responses, + dependencies: dependencies + } + + {spec_ref, spec_data} + end) + + state = Map.update!(state, :pending_specs, &Map.merge(&1, updated_specs)) + + Enum.reduce(updated_specs, state, fn {spec_ref, _spec_data}, state -> + proceed_spec_startup(spec_ref, state) + end) + end + + @spec remove_link_from_spec(Link.id(), Parent.state()) :: Parent.state() + def remove_link_from_spec(link_id, state) do + {:ok, removed_link} = Map.fetch(state.links, link_id) + spec_ref = removed_link.spec_ref + + with {:ok, spec_data} <- Map.fetch(state.pending_specs, spec_ref) do + links_ids = Enum.reject(spec_data.links_ids, &(&1 == link_id)) + + spec_links_endpoints = + Enum.flat_map(links_ids, fn id -> + link = state.links[id] + [link.from.child, link.to.child] + end) + + dependencies = + [removed_link.from.child, removed_link.to.child] + |> Enum.filter(&(&1 not in spec_links_endpoints)) + |> case do + [] -> + spec_data.dependencies + + endpoints_to_remove -> + spec_data.dependencies + |> update_spec_dependencies(endpoints_to_remove) + end + + awaiting_responses = + spec_data.awaiting_responses + |> MapSet.difference(MapSet.new([{link_id, :input}, {link_id, :output}])) + + spec_data = %{ + spec_data + | dependencies: dependencies, + links_ids: links_ids, + awaiting_responses: awaiting_responses + } + + state = put_in(state, [:pending_specs, spec_ref], spec_data) + proceed_spec_startup(spec_ref, state) + else + :error -> state + end + end + + defp update_spec_dependencies(spec_dependencies, children_removed_from_spec) do + spec_dependencies + |> Enum.map(fn {spec_ref, spec_children} -> + { + spec_ref, + Enum.reject(spec_children, &(&1 in children_removed_from_spec)) + } + end) + |> Enum.reject(&match?({_ref, []}, &1)) + |> Map.new() + end + + @spec handle_child_pad_removed(Child.name(), Pad.ref(), Parent.state()) :: Parent.state() + def handle_child_pad_removed(child, pad, state) do + Membrane.Logger.debug_verbose("Child #{inspect(child)} removed pad #{inspect(pad)}") + + Parent.ChildrenModel.assert_child_exists!(state, child) + + state = + CallbackHandler.exec_and_handle_callback( + :handle_child_pad_removed, + Component.action_handler(state), + %{context: &Component.context_from_state/1}, + [child, pad], + state + ) + + LinkUtils.handle_child_pad_removed(child, pad, state) end @doc """ @@ -539,7 +705,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do - handles crash group (if applicable) """ @spec handle_child_death( - child_name :: Membrane.Child.name(), + child_name :: Child.name(), reason :: any(), state :: Parent.state() ) :: {:stop | :continue, Parent.state()} @@ -564,21 +730,16 @@ defmodule Membrane.Core.Parent.ChildLifeController do %{pid: child_pid} = ChildrenModel.get_child_data!(state, child_name) with {:ok, group} <- CrashGroupUtils.get_group_by_member_pid(child_pid, state) do + state = + group.members + |> Enum.filter(&Map.has_key?(state.children, &1)) + |> remove_children_from_specs(state) + {result, state} = crash_all_group_members(group, child_name, state) |> remove_child_from_crash_group(group, child_pid) if result == :removed do - state = - Enum.reduce(group.members, state, fn child_name, state -> - with {%{spec_ref: spec_ref}, state} <- pop_in(state, [:children, child_name]) do - state = LinkUtils.unlink_element(child_name, state) - cleanup_spec_startup(spec_ref, state) - else - {nil, state} -> state - end - end) - exec_handle_crash_group_down_callback( group.name, group.members, @@ -588,6 +749,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do else state end + |> Bunch.Access.delete_in([:children, child_name]) else {:error, :not_member} when reason == {:shutdown, :membrane_crash_group_kill} -> raise Membrane.PipelineError, @@ -621,15 +783,15 @@ defmodule Membrane.Core.Parent.ChildLifeController do end defp remove_spec_from_dependencies(spec_ref, state) do - dependent_specs = + dependencies = state.pending_specs - |> Enum.filter(fn {_ref, data} -> spec_ref in data.dependent_specs end) + |> Enum.filter(fn {_ref, data} -> Map.has_key?(data.dependencies, spec_ref) end) |> Map.new(fn {ref, data} -> - {ref, Map.update!(data, :dependent_specs, &MapSet.delete(&1, spec_ref))} + {ref, Map.update!(data, :dependencies, &Map.delete(&1, spec_ref))} end) - state = %{state | pending_specs: Map.merge(state.pending_specs, dependent_specs)} - dependent_specs |> Map.keys() |> Enum.reduce(state, &proceed_spec_startup/2) + state = %{state | pending_specs: Map.merge(state.pending_specs, dependencies)} + dependencies |> Map.keys() |> Enum.reduce(state, &proceed_spec_startup/2) end defp exec_handle_crash_group_down_callback( @@ -654,7 +816,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do end # called when process was a member of a crash group - @spec crash_all_group_members(CrashGroup.t(), Membrane.Child.name(), Parent.state()) :: + @spec crash_all_group_members(CrashGroup.t(), Child.name(), Parent.state()) :: Parent.state() defp crash_all_group_members( %CrashGroup{triggered?: false} = crash_group, diff --git a/lib/membrane/core/parent/child_life_controller/link_utils.ex b/lib/membrane/core/parent/child_life_controller/link_utils.ex index 04aa58418..c36daf9c1 100644 --- a/lib/membrane/core/parent/child_life_controller/link_utils.ex +++ b/lib/membrane/core/parent/child_life_controller/link_utils.ex @@ -3,7 +3,7 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do use Bunch - alias Membrane.ParentError + alias Membrane.Child alias Membrane.Core.{Bin, Message, Parent, Telemetry} alias Membrane.Core.Bin.PadController @@ -29,54 +29,92 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do %CrashGroup{members: members_names} = crash_group Enum.reduce(members_names, state, fn member_name, state -> - unlink_element(member_name, state) + with %{children: %{^member_name => _data}} <- state do + unlink_element(member_name, state) + end end) end - @spec remove_link(Membrane.Child.name(), Pad.ref(), Parent.state()) :: Parent.state() - def remove_link(child_name, pad_ref, state) do - Enum.find(state.links, fn {_id, link} -> - [link.from, link.to] - |> Enum.any?(&(&1.child == child_name and &1.pad_ref == pad_ref)) - end) - |> case do - {_id, %Link{} = link} -> - for endpoint <- [link.from, link.to] do + @spec handle_child_pad_removed(Child.name(), Pad.ref(), Parent.state()) :: Parent.state() + def handle_child_pad_removed(child, pad, state) do + {:ok, link} = get_link(state.links, child, pad) + + state = + opposite_endpoint(link, child) + |> case do + %Endpoint{child: {Membrane.Bin, :itself}} = bin_endpoint -> + PadController.remove_pad!(bin_endpoint.pad_ref, state) + + %Endpoint{} = endpoint -> Message.send(endpoint.pid, :handle_unlink, endpoint.pad_ref) - end + state + end - links = Map.delete(state.links, link.id) - Map.put(state, :links, links) + state = ChildLifeController.remove_link_from_spec(link.id, state) - nil -> - with %{^child_name => _child_entry} <- state.children do - raise ParentError, """ - Attempted to unlink pad #{inspect(pad_ref)} of child #{inspect(child_name)}, but this child does not have this pad linked - """ - end + delete_link(link, state) + end - raise ParentError, """ - Attempted to unlink pad #{inspect(pad_ref)} of child #{inspect(child_name)}, but such a child does not exist - """ - end + @spec remove_link!(Link.id(), Parent.state()) :: Parent.state() + def remove_link!(link_id, state) do + link = Map.fetch!(state.links, link_id) + + state = + [link.from, link.to] + |> Enum.reject(&(&1 == nil)) + |> Enum.reduce(state, &unlink_endpoint/2) + + state = ChildLifeController.remove_link_from_spec(link_id, state) + delete_link(link, state) end - @spec unlink_element(Membrane.Child.name(), Parent.state()) :: Parent.state() + @spec unlink_element(Child.name(), Parent.state()) :: Parent.state() def unlink_element(child_name, state) do - Map.update!( - state, - :links, - &Map.reject(&1, fn {_id, %Link{} = link} -> - case endpoint_to_unlink(child_name, link) do - %Endpoint{pid: pid, pad_ref: pad_ref} -> - Message.send(pid, :handle_unlink, pad_ref) - true + grouped_links = + state.links + |> Map.values() + |> Enum.group_by(fn + %Link{from: %{child: ^child_name}, to: %{pad_props: %{implicit_unlink?: false}}} -> + :cut_links + + %Link{from: %{child: ^child_name}} -> + :deleted_links - nil -> - false + %Link{to: %{child: ^child_name}} -> + :deleted_links + + %Link{} -> + :unchanged_links + end) + + cut_links = Map.get(grouped_links, :cut_links, []) + deleted_links = Map.get(grouped_links, :deleted_links, []) + unchanged_links = Map.get(grouped_links, :unchanged_links, []) + + state = + Enum.reduce(deleted_links, state, fn link, state -> + case endpoint_to_unlink(child_name, link) do + %Endpoint{} = endpoint -> unlink_endpoint(endpoint, state) + nil -> state end end) - ) + + links = + Enum.map(cut_links, &%{&1 | from: nil, cut?: true}) + |> Enum.concat(unchanged_links) + |> Map.new(&{&1.id, &1}) + + %{state | links: links} + end + + @spec unlink_endpoint(Endpoint.t(), Parent.state()) :: Parent.state() + def unlink_endpoint(%Endpoint{child: {Membrane.Bin, :itself}} = endpoint, state) do + PadController.remove_pad!(endpoint.pad_ref, state) + end + + def unlink_endpoint(%Endpoint{} = endpoint, state) do + Message.send(endpoint.pid, :handle_unlink, endpoint.pad_ref) + state end defp endpoint_to_unlink(child_name, %Link{from: %Endpoint{child: child_name}, to: to}), do: to @@ -145,6 +183,34 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do links end + defp get_link(links, child, pad) do + Enum.find(links, fn {_id, link} -> + [link.from, link.to] + |> Enum.any?(&(&1.child == child and &1.pad_ref == pad)) + end) + |> case do + {_id, %Link{} = link} -> {:ok, link} + nil -> {:error, :not_found} + end + end + + defp opposite_endpoint(%Link{from: %Endpoint{child: child}, to: to}, child), do: to + + defp opposite_endpoint(%Link{to: %Endpoint{child: child}, from: from}, child), do: from + + defp delete_link(link, state) do + {_link, state} = pop_in(state, [:links, link.id]) + spec_ref = link.spec_ref + + with {:ok, spec_data} <- Map.fetch(state.pending_specs, spec_ref) do + new_links_ids = Enum.reject(spec_data.links_ids, &(&1 == link.id)) + state = put_in(state, [:pending_specs, spec_ref, :links_ids], new_links_ids) + ChildLifeController.proceed_spec_startup(spec_ref, state) + else + :error -> state + end + end + defp validate_links(links, state) do links |> Enum.concat(Map.values(state.links)) @@ -239,23 +305,22 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do if {Membrane.Bin, :itself} in [from.child, to.child] do state else - from_availability = Pad.availability_mode(from.pad_info.availability) - to_availability = Pad.availability_mode(to.pad_info.availability) params = %{initiator: :parent, stream_format_validation_params: []} case Message.call(from.pid, :handle_link, [:output, from, to, params]) do :ok -> put_in(state, [:links, link.id, :linked?], true) - {:error, {:call_failure, _reason}} when to_availability == :static -> - Process.exit(to.pid, :kill) - state + {:error, {:unknown_pad, name, module, pad}} -> + Membrane.Logger.debug(""" + Failed to establish link between #{inspect(from.pad_ref)} and #{inspect(to.pad_ref)} + because pad #{inspect(pad)} of component named #{inspect(name)} (#{inspect(module)}) + is down. + """) - {:error, {:neighbor_dead, _reason}} when from_availability == :static -> - Process.exit(from.pid, :kill) state - {:error, {:call_failure, _reason}} when to_availability == :dynamic -> + {:error, {:call_failure, _reason}} -> Membrane.Logger.debug(""" Failed to establish link between #{inspect(from.pad_ref)} and #{inspect(to.pad_ref)} because #{inspect(from.child)} is down. @@ -263,7 +328,7 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do state - {:error, {:neighbor_dead, _reason}} when from_availability == :dynamic -> + {:error, {:neighbor_dead, _reason}} -> Membrane.Logger.debug(""" Failed to establish link between #{inspect(from.pad_ref)} and #{inspect(to.pad_ref)} because #{inspect(to.child)} is down. diff --git a/lib/membrane/core/parent/link.ex b/lib/membrane/core/parent/link.ex index 2e078b5c4..27d5beab0 100644 --- a/lib/membrane/core/parent/link.ex +++ b/lib/membrane/core/parent/link.ex @@ -6,7 +6,7 @@ defmodule Membrane.Core.Parent.Link do alias __MODULE__.Endpoint @enforce_keys [:id, :from, :to] - defstruct @enforce_keys ++ [linked?: false, spec_ref: nil] + defstruct @enforce_keys ++ [linked?: false, spec_ref: nil, cut?: false] @type id :: reference() @@ -15,6 +15,7 @@ defmodule Membrane.Core.Parent.Link do from: Endpoint.t(), to: Endpoint.t(), linked?: boolean(), + cut?: boolean(), spec_ref: Membrane.Core.Parent.ChildLifeController.spec_ref() } end diff --git a/lib/membrane/core/pipeline.ex b/lib/membrane/core/pipeline.ex index 18f978416..28b25da77 100644 --- a/lib/membrane/core/pipeline.ex +++ b/lib/membrane/core/pipeline.ex @@ -73,6 +73,12 @@ defmodule Membrane.Core.Pipeline do {:noreply, state} end + @impl GenServer + def handle_info(Message.new(:child_pad_removed, [child, pad]), state) do + state = ChildLifeController.handle_child_pad_removed(child, pad, state) + {:noreply, state} + end + @impl GenServer def handle_info(Message.new(:child_notification, [from, notification]), state) do state = LifecycleController.handle_child_notification(from, notification, state) diff --git a/lib/membrane/core/pipeline/action_handler.ex b/lib/membrane/core/pipeline/action_handler.ex index b078dc021..d13e42cd0 100644 --- a/lib/membrane/core/pipeline/action_handler.ex +++ b/lib/membrane/core/pipeline/action_handler.ex @@ -49,8 +49,8 @@ defmodule Membrane.Core.Pipeline.ActionHandler do end @impl CallbackHandler - def handle_action({:remove_link, {child_name, pad_ref}}, _cb, _params, state) do - Parent.ChildLifeController.handle_remove_link(child_name, pad_ref, state) + def handle_action({:remove_child_pad, {child, pad}}, _cb, _params, state) do + Parent.ChildLifeController.handle_remove_child_pad(child, pad, state) end @impl CallbackHandler diff --git a/lib/membrane/pad.ex b/lib/membrane/pad.ex index a6a927a23..3b5b8bf1f 100644 --- a/lib/membrane/pad.ex +++ b/lib/membrane/pad.ex @@ -164,6 +164,12 @@ defmodule Membrane.Pad do (term |> is_tuple and term |> tuple_size == 3 and term |> elem(0) == __MODULE__ and term |> elem(1) |> is_atom) + defguard is_static_pad_ref(term) when is_atom(term) + + defguard is_dynamic_pad_ref(term) + when term |> is_tuple and term |> tuple_size == 3 and term |> elem(0) == __MODULE__ and + term |> elem(1) |> is_atom + defguard is_pad_name(term) when is_atom(term) defguard is_availability(term) when term in @availability_values diff --git a/lib/membrane/pipeline.ex b/lib/membrane/pipeline.ex index 4272742bd..11100ca45 100644 --- a/lib/membrane/pipeline.ex +++ b/lib/membrane/pipeline.ex @@ -29,7 +29,7 @@ defmodule Membrane.Pipeline do use Membrane.Pipeline def start_link(options) do - Membrane.Pipeline.start_link(options, name: MyPipeline) + Membrane.Pipeline.start_link(__MODULE__, options, name: MyPipeline) end # ... @@ -135,6 +135,19 @@ defmodule Membrane.Pipeline do ) :: callback_return + @doc """ + Callback invoked when a child removes its pad. + + Removing child's pad due to return `t:Membrane.Bin.Action.remove_child_pad()` + from `Membrane.Pipeline` callbacks does not trigger this callback. + """ + @callback handle_child_pad_removed( + child :: Child.name(), + pad :: Pad.ref(), + context :: CallbackContext.t(), + state :: state + ) :: callback_return + @doc """ Callback invoked when a notification comes in from an element. """ @@ -231,7 +244,8 @@ defmodule Membrane.Pipeline do handle_tick: 3, handle_crash_group_down: 3, handle_call: 3, - handle_terminate_request: 2 + handle_terminate_request: 2, + handle_child_pad_removed: 4 @doc """ Starts the Pipeline based on given module and links it to the current diff --git a/lib/membrane/pipeline/action.ex b/lib/membrane/pipeline/action.ex index f41809f6e..c8c40f688 100644 --- a/lib/membrane/pipeline/action.ex +++ b/lib/membrane/pipeline/action.ex @@ -48,7 +48,7 @@ defmodule Membrane.Pipeline.Action do Removed link has to have dynamic pads on both ends. """ - @type remove_link :: {:remove_link, {Child.name(), Pad.ref()}} + @type remove_child_pad :: {:remove_child_pad, {Child.name(), Pad.ref()}} @typedoc """ Starts a timer that will invoke `c:Membrane.Pipeline.handle_tick/3` callback @@ -135,7 +135,7 @@ defmodule Membrane.Pipeline.Action do | notify_child | spec | remove_children - | remove_link + | remove_child_pad | start_timer | timer_interval | stop_timer diff --git a/lib/membrane/testing/assertions.ex b/lib/membrane/testing/assertions.ex index 1da52d45e..28f4f111b 100644 --- a/lib/membrane/testing/assertions.ex +++ b/lib/membrane/testing/assertions.ex @@ -449,6 +449,23 @@ defmodule Membrane.Testing.Assertions do ) end + @doc """ + Asserts that `Membrane.Testing.Pipeline` child with name `child` removed or is going to + remove it's pad with ref `pad` within the `timeout` period specified in milliseconds. + """ + defmacro assert_child_pad_removed( + pipeline, + child, + pad, + timeout \\ @default_timeout + ) do + assert_receive_from_pipeline( + pipeline, + {:handle_child_pad_removed, {child, pad}}, + timeout + ) + end + @doc """ Asserts that a cleanup function was registered in `Membrane.Testing.MockResourceGuard`. """ diff --git a/lib/membrane/testing/pipeline.ex b/lib/membrane/testing/pipeline.ex index 7d56c094b..e7da29f0e 100644 --- a/lib/membrane/testing/pipeline.ex +++ b/lib/membrane/testing/pipeline.ex @@ -92,7 +92,7 @@ defmodule Membrane.Testing.Pipeline do @moduledoc false @enforce_keys [:test_process, :module] - defstruct @enforce_keys ++ [:custom_pipeline_state] + defstruct @enforce_keys ++ [:custom_pipeline_state, :raise_on_child_pad_removed?] @typedoc """ Structure for holding state @@ -109,7 +109,8 @@ defmodule Membrane.Testing.Pipeline do @type t :: %__MODULE__{ test_process: pid() | nil, module: module() | nil, - custom_pipeline_state: Pipeline.state() + custom_pipeline_state: Pipeline.state(), + raise_on_child_pad_removed?: boolean() | nil } end @@ -118,7 +119,8 @@ defmodule Membrane.Testing.Pipeline do module: :default, spec: [ChildrenSpec.builder()], test_process: pid(), - name: Pipeline.name() + name: Pipeline.name(), + raise_on_child_pad_removed?: boolean() ] | [ module: module(), @@ -285,8 +287,15 @@ defmodule Membrane.Testing.Pipeline do case Keyword.get(options, :module, :default) do :default -> spec = Bunch.listify(Keyword.get(options, :spec, [])) + test_process = Keyword.fetch!(options, :test_process) + raise? = Keyword.get(options, :raise_on_child_pad_removed?, true) + + new_state = %State{ + test_process: test_process, + raise_on_child_pad_removed?: raise?, + module: nil + } - new_state = %State{test_process: Keyword.fetch!(options, :test_process), module: nil} {[spec: spec], new_state} module when is_atom(module) -> @@ -466,6 +475,29 @@ defmodule Membrane.Testing.Pipeline do {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)} end + @impl true + def handle_child_pad_removed(child, pad, ctx, state) do + if state.raise_on_child_pad_removed? do + raise """ + Child #{inspect(child)} removed it's pad #{inspect(pad)}. If you want to + handle such a scenario, pass `raise_on_child_pad_removed?: false` option to + `Membrane.Testing.Pipeline.start_*/2` or pass there a pipeline module + implementing this callback via `:name` option. + """ + end + + {custom_actions, custom_state} = + eval_injected_module_callback( + :handle_child_pad_removed, + [child, pad, ctx], + state + ) + + :ok = notify_test_process(state.test_process, {:handle_child_pad_removed, {child, pad}}) + + {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)} + end + @impl true def handle_crash_group_down(group_name, ctx, state) do {custom_actions, custom_state} = @@ -482,8 +514,8 @@ defmodule Membrane.Testing.Pipeline do defp eval_injected_module_callback(callback, args, state) - defp eval_injected_module_callback(_callback, _args, %State{module: nil} = state), - do: {[], state} + defp eval_injected_module_callback(_callback, _args, %State{module: nil}), + do: {[], nil} defp eval_injected_module_callback(callback, args, state) do apply(state.module, callback, args ++ [state.custom_pipeline_state]) diff --git a/test/membrane/core/element_test.exs b/test/membrane/core/element_test.exs index 549c1604b..a4ef0ad7f 100644 --- a/test/membrane/core/element_test.exs +++ b/test/membrane/core/element_test.exs @@ -255,7 +255,10 @@ defmodule Membrane.Core.ElementTest do test "should handle unlinking pads" do assert {:noreply, state} = - Element.handle_info(Message.new(:handle_unlink, :dynamic_input), linked_state()) + Element.handle_info( + Message.new(:handle_unlink, :dynamic_input), + linked_state() + ) refute Map.has_key?(state.pads_data, :dynamic_input) end diff --git a/test/membrane/element_test.exs b/test/membrane/element_test.exs index baf206417..bcd7075ce 100644 --- a/test/membrane/element_test.exs +++ b/test/membrane/element_test.exs @@ -99,7 +99,6 @@ defmodule Membrane.ElementTest do end describe "End of stream" do - @tag :target test "causes handle_end_of_stream/3 to be called", %{pipeline: pipeline} do assert_pipeline_play(pipeline) diff --git a/test/membrane/integration/child_crash_test.exs b/test/membrane/integration/child_crash_test.exs index f397d5213..7a63a03e8 100644 --- a/test/membrane/integration/child_crash_test.exs +++ b/test/membrane/integration/child_crash_test.exs @@ -1,11 +1,15 @@ defmodule Membrane.Integration.ChildCrashTest do use ExUnit.Case, async: false + import Membrane.ChildrenSpec import Membrane.Testing.Assertions alias Membrane.Support.ChildCrashTest alias Membrane.Testing + require Membrane.Pad, as: Pad + require Membrane.Child, as: Child + test "Element that is not member of any crash group crashed when pipeline is in playing state" do Process.flag(:trap_exit, true) @@ -199,6 +203,194 @@ defmodule Membrane.Integration.ChildCrashTest do assert_pipeline_crash_group_down(pipeline_pid, 2) end + defmodule DynamicElement do + use Membrane.Endpoint + + def_input_pad :input, + accepted_format: _any, + availability: :on_request, + flow_control: :push + + def_output_pad :output, + accepted_format: _any, + availability: :on_request, + flow_control: :push + + @impl true + def handle_playing(_ctx, _opts) do + {[notify_parent: :playing], %{}} + end + end + + defmodule Bin do + use Membrane.Bin + + alias Membrane.Integration.ChildCrashTest.DynamicElement + require Membrane.Pad, as: Pad + + def_input_pad :input, + accepted_format: _any, + availability: :on_request + + def_output_pad :output, + accepted_format: _any, + availability: :on_request + + def_options do_internal_link: [spec: boolean(), default: true] + + @impl true + def handle_playing(_ctx, _opts) do + {[notify_parent: :playing], %{}} + end + + @impl true + def handle_pad_added(Pad.ref(direction, _id) = pad, _ctx, state) do + spec = + if state.do_internal_link do + [ + {child(direction, DynamicElement), group: :group, crash_group_mode: :temporary}, + get_child(Child.ref(direction, group: :group)) |> bin_output(pad) + ] + else + [] + end + + {[spec: spec, notify_parent: :handle_pad_added], state} + end + end + + defmodule OuterBin do + use Membrane.Bin + + alias Membrane.Integration.ChildCrashTest.Bin + + def_output_pad :output, + accepted_format: _any, + availability: :on_request + + @impl true + def handle_pad_added(pad, _ctx, state) do + spec = child(:bin, Bin) |> bin_output(pad) + {[spec: spec], state} + end + + @impl true + def handle_child_notification(notification, :bin, _ctx, state) do + {[notify_parent: {:child_notification, notification}], state} + end + + @impl true + def handle_child_pad_removed(child, pad, _ctx, state) do + {[notify_parent: {:child_pad_removed, child, pad}], state} + end + end + + describe "When crash group inside a bin crashes" do + test "bin removes a pad" do + pipeline = + Testing.Pipeline.start_link_supervised!( + spec: child(:bin, Bin) |> child(:element, DynamicElement), + raise_on_child_pad_removed?: false + ) + + assert_pipeline_notified(pipeline, :element, :playing) + + child_ref = Child.ref(:output, group: :group) + bin_child_pid = Testing.Pipeline.get_child_pid!(pipeline, [:bin, child_ref]) + Process.exit(bin_child_pid, :kill) + + assert_child_pad_removed(pipeline, :bin, Pad.ref(:output, _id)) + + Testing.Pipeline.terminate(pipeline) + end + + test "spec is updated" do + pipeline = + Testing.Pipeline.start_link_supervised!( + spec: + child(:first_bin, %Bin{do_internal_link: false}) + |> child(:second_bin, Bin) + |> child(:element, DynamicElement), + raise_on_child_pad_removed?: false + ) + + assert_pipeline_notified(pipeline, :second_bin, :handle_pad_added) + refute_pipeline_notified(pipeline, :element, :playing) + + child_ref = Child.ref(:output, group: :group) + + Testing.Pipeline.get_child_pid!(pipeline, [:second_bin, child_ref]) + |> Process.exit(:kill) + + assert_child_pad_removed(pipeline, :second_bin, Pad.ref(:output, _id)) + + Testing.Pipeline.execute_actions(pipeline, remove_children: :first_bin) + + assert_pipeline_notified(pipeline, :element, :playing) + assert_pipeline_notified(pipeline, :second_bin, :playing) + + Testing.Pipeline.terminate(pipeline) + end + + test "bin's parent's parent is notified, if should be" do + pipeline = + Testing.Pipeline.start_link_supervised!( + spec: + child(:bin, OuterBin) + |> child(:element, DynamicElement), + raise_on_child_pad_removed?: false + ) + + assert_pipeline_notified(pipeline, :element, :playing) + + inner_element_pid = + Testing.Pipeline.get_child_pid!( + pipeline, + [:bin, :bin, Child.ref(:output, group: :group)] + ) + + Process.exit(inner_element_pid, :kill) + + assert_child_pad_removed(pipeline, :bin, Pad.ref(:output, _id)) + assert_pipeline_notified(pipeline, :bin, {:child_pad_removed, :bin, Pad.ref(:output, _id)}) + + Testing.Pipeline.terminate(pipeline) + end + end + + test "When crash group crashes, another crash group from this same spec is still living" do + children_definitions = + child(:first_bin, %Bin{do_internal_link: false}) + |> child(:second_bin, Bin) + |> child(:element, DynamicElement) + + spec = [ + {children_definitions, group: :a, crash_group_mode: :temporary}, + {children_definitions, group: :b, crash_group_mode: :temporary} + ] + + pipeline = + Testing.Pipeline.start_link_supervised!( + spec: spec, + raise_on_child_pad_removed?: false + ) + + assert_pipeline_notified(pipeline, Child.ref(:second_bin, group: :a), :handle_pad_added) + + Testing.Pipeline.get_child_pid!(pipeline, Child.ref(:second_bin, group: :a)) + |> Process.exit(:kill) + + assert_pipeline_crash_group_down(pipeline, :a) + refute_pipeline_notified(pipeline, Child.ref(:second_bin, group: :b), :playing) + + Testing.Pipeline.execute_actions(pipeline, remove_children: Child.ref(:first_bin, group: :b)) + + assert_pipeline_notified(pipeline, Child.ref(:second_bin, group: :b), :playing) + assert_pipeline_notified(pipeline, Child.ref(:element, group: :b), :playing) + + Testing.Pipeline.terminate(pipeline) + end + defp assert_pid_alive(pid) do refute_receive {:EXIT, ^pid, _} end diff --git a/test/membrane/integration/child_pad_removed_test.exs b/test/membrane/integration/child_pad_removed_test.exs new file mode 100644 index 000000000..1f7f97fd1 --- /dev/null +++ b/test/membrane/integration/child_pad_removed_test.exs @@ -0,0 +1,202 @@ +defmodule Membrane.Integration.ChildPadRemovedTest do + use ExUnit.Case, async: false + + alias Membrane.Testing + + require Membrane.Pad, as: Pad + + defmodule DynamicSource do + use Membrane.Source + def_output_pad :output, flow_control: :push, availability: :on_request, accepted_format: _any + end + + defmodule DynamicSink do + use Membrane.Sink + def_input_pad :input, flow_control: :push, availability: :on_request, accepted_format: _any + def_options test_process: [spec: pid()] + + @impl true + def handle_init(ctx, opts) do + send(opts.test_process, {:init, ctx.name}) + {[], Map.from_struct(opts)} + end + + @impl true + def handle_pad_added(_pad, ctx, state) do + send(state.test_process, {:pad_added, ctx.name}) + {[], state} + end + + @impl true + def handle_pad_removed(_pad, ctx, state) do + send(state.test_process, {:pad_removed, ctx.name}) + {[], state} + end + end + + defmodule StaticSink do + use Membrane.Sink + def_input_pad :input, flow_control: :push, accepted_format: _any + def_options test_process: [spec: pid()] + + @impl true + def handle_init(ctx, opts) do + send(opts.test_process, {:init, ctx.name}) + {[], Map.from_struct(opts)} + end + end + + defmodule DynamicBin do + use Membrane.Bin + + require Membrane.Pad, as: Pad + + def_output_pad :output, availability: :on_request, accepted_format: _any + def_options test_process: [spec: pid()] + + @impl true + def handle_parent_notification({:execute_actions, actions}, _ctx, state) do + {actions, state} + end + + @impl true + def handle_pad_added(pad, _ctx, state) do + spec = + child(:source, DynamicSource) + |> via_out(Pad.ref(:output, 1)) + |> bin_output(pad) + + {[spec: spec], state} + end + end + + defmodule Pipeline do + use Membrane.Pipeline + + @impl true + def handle_init(_ctx, opts) do + %{bin: bin, sink: sink, actions_on_child_removed_pad: actions, test_process: test_process} = + Map.new(opts) + + spec = child(:bin, bin) |> child(:sink, sink) + {[spec: spec], %{actions: actions, test_process: test_process}} + end + + @impl true + def handle_info({:execute_actions, actions}, _ctx, state) do + {actions, state} + end + + @impl true + def handle_child_pad_removed(:bin, pad, _ctx, %{actions: actions} = state) do + send(state.test_process, {:child_pad_removed, :bin, pad}) + {actions, state} + end + end + + defp start_pipeline!(bin, sink, actions_on_child_removed_pad \\ []) do + do_start_pipeline!(:start, bin, sink, actions_on_child_removed_pad) + end + + defp start_link_pipeline!(bin, sink, actions_on_child_removed_pad \\ []) do + do_start_pipeline!(:start_link, bin, sink, actions_on_child_removed_pad) + end + + defp do_start_pipeline!(function, bin, sink, actions) do + args = [ + Pipeline, + [ + bin: struct(bin, test_process: self()), + sink: struct(sink, test_process: self()), + actions_on_child_removed_pad: actions, + test_process: self() + ] + ] + + {:ok, _supervisor, pipeline} = apply(Membrane.Pipeline, function, args) + + pipeline + end + + defp execute_actions_in_bin(pipeline, actions) do + msg_to_bin = {:execute_actions, actions} + msg_to_pipeline = {:execute_actions, notify_child: {:bin, msg_to_bin}} + send(pipeline, msg_to_pipeline) + end + + defp assert_child_exists(pipeline, child_ref_path) do + assert {:ok, pid} = Testing.Pipeline.get_child_pid(pipeline, child_ref_path) + assert is_pid(pid) + end + + describe "when child-bin removes a pad" do + test "sibling is unlinked" do + for bin_actions <- [ + [remove_children: :source], + [remove_child_pad: {:source, Pad.ref(:output, 1)}] + ] do + pipeline = start_link_pipeline!(DynamicBin, DynamicSink) + + execute_actions_in_bin(pipeline, bin_actions) + + receive do + {:pad_added, :sink} -> + assert_receive {:pad_removed, :sink} + after + 500 -> + refute_received {:pad_removed, :sink} + end + + assert_receive {:child_pad_removed, :bin, Pad.ref(:output, _id)} + assert_child_exists(pipeline, :bin) + assert_child_exists(pipeline, :sink) + + Pipeline.terminate(pipeline, blocking?: true) + end + end + + test "sibling linked via static pad raises" do + for actions <- [ + [remove_children: :source], + [remove_child_pad: {:source, Pad.ref(:output, 1)}] + ] do + pipeline = start_pipeline!(DynamicBin, StaticSink) + + assert_receive {:init, :sink} + sink_pid = Testing.Pipeline.get_child_pid!(pipeline, :sink) + monitor_ref = Process.monitor(sink_pid) + + execute_actions_in_bin(pipeline, actions) + + assert_receive {:child_pad_removed, :bin, Pad.ref(:output, _id)} + + assert_receive {:DOWN, ^monitor_ref, :process, ^sink_pid, + {%Membrane.PadError{message: message}, _stacktrace}} + + assert message =~ ~r/Tried.*to.*unlink.*a.*static.*pad.*input.*/ + end + end + + @tag :target + test "and sibling linked via static pad is removed, pipeline is not raising" do + for bin_actions <- [ + [remove_children: :source], + [remove_child_pad: {:source, Pad.ref(:output, 1)}] + ] do + pipeline = start_link_pipeline!(DynamicBin, StaticSink, remove_children: :sink) + + assert_receive {:init, :sink} + sink_pid = Testing.Pipeline.get_child_pid!(pipeline, :sink) + monitor_ref = Process.monitor(sink_pid) + + execute_actions_in_bin(pipeline, bin_actions) + + assert_receive {:child_pad_removed, :bin, Pad.ref(:output, _id)} + assert_receive {:DOWN, ^monitor_ref, :process, ^sink_pid, _reason} + assert_child_exists(pipeline, :bin) + + Pipeline.terminate(pipeline, blocking?: true) + end + end + end +end diff --git a/test/membrane/integration/cutting_link_test.exs b/test/membrane/integration/cutting_link_test.exs new file mode 100644 index 000000000..e7e2f206c --- /dev/null +++ b/test/membrane/integration/cutting_link_test.exs @@ -0,0 +1,327 @@ +defmodule Membrane.Integration.CuttingLinksTest do + use Bunch + use ExUnit.Case, async: false + + import Membrane.ChildrenSpec + import Membrane.Testing.Assertions + + alias Membrane.Testing + + require Membrane.Pad, as: Pad + + defmodule Macros do + defmacro notify_on_playing() do + quote do + @impl true + def handle_playing(_ctx, state) do + {[notify_parent: :playing], state} + end + end + end + + defmacro notify_on_pad_removed() do + quote do + @impl true + def handle_pad_removed(pad, _ctx, state) do + {[notify_parent: {:pad_removed, pad}], state} + end + end + end + + defmacro forward_child_notification() do + quote do + @impl true + def handle_child_notification(msg, child, _ctx, state) do + {[notify_parent: {:child_notification, child, msg}], state} + end + end + end + end + + defmodule DynamicElement do + use Membrane.Endpoint + require Membrane.Integration.CuttingLinksTest.Macros, as: Macros + + def_input_pad :input, accepted_format: _any, flow_control: :push, availability: :on_request + def_output_pad :output, accepted_format: _any, flow_control: :push, availability: :on_request + + Macros.notify_on_playing() + Macros.notify_on_pad_removed() + end + + defmodule StaticSource do + use Membrane.Source + def_output_pad :output, accepted_format: _any, flow_control: :push + end + + defmodule StaticSink do + use Membrane.Sink + def_input_pad :input, accepted_format: _any, flow_control: :push + end + + defmodule DynamicBin do + use Membrane.Bin + require Membrane.Integration.CuttingLinksTest.Macros, as: Macros + + def_input_pad :input, accepted_format: _any, availability: :on_request + def_output_pad :output, accepted_format: _any, availability: :on_request + + Macros.notify_on_playing() + Macros.notify_on_pad_removed() + Macros.forward_child_notification() + + @impl true + def handle_init(_ctx, state) do + {[spec: child(:element, DynamicElement)], state} + end + + @impl true + def handle_pad_added(Pad.ref(direction, _id) = pad, _ctx, state) do + spec = + case direction do + :input -> bin_input(pad) |> get_child(:element) + :output -> get_child(:element) |> bin_output(pad) + end + + {[spec: spec], state} + end + end + + defmodule NestedBin do + use Membrane.Bin + require Membrane.Integration.CuttingLinksTest.Macros, as: Macros + + def_input_pad :input, accepted_format: _any, availability: :on_request + + Macros.notify_on_playing() + Macros.notify_on_pad_removed() + Macros.forward_child_notification() + + @impl true + def handle_init(_ctx, state) do + {[spec: child(:inner_bin, DynamicBin)], state} + end + + @impl true + def handle_pad_added(Pad.ref(:input, _id) = pad, _ctx, state) do + spec = + bin_input(pad) + |> via_in(pad, implicit_unlink?: false) + |> get_child(:inner_bin) + + {[spec: spec], state} + end + end + + describe "cutting link between 2 elements" do + test "with dynamic pads" do + pipeline = + Testing.Pipeline.start_link_supervised!( + spec: + child(:source, DynamicElement) + |> via_in(Pad.ref(:input, 1), implicit_unlink?: false) + |> child(:sink, DynamicElement) + ) + + assert_pipeline_notified(pipeline, :source, :playing) + assert_pipeline_notified(pipeline, :sink, :playing) + + Testing.Pipeline.execute_actions(pipeline, remove_children: :source) + refute_pipeline_notified(pipeline, :sink, {:pad_removed, _pad}) + + Testing.Pipeline.execute_actions(pipeline, remove_child_pad: {:sink, Pad.ref(:input, 1)}) + assert_pipeline_notified(pipeline, :sink, {:pad_removed, Pad.ref(:input, 1)}) + + Testing.Pipeline.terminate(pipeline) + end + + test "when `source` side has a static pad" do + pipeline = + Testing.Pipeline.start_link_supervised!( + spec: + child(:source, StaticSource) + |> via_in(Pad.ref(:input, 1), implicit_unlink?: false) + |> child(:sink, DynamicElement) + ) + + assert_pipeline_notified(pipeline, :sink, :playing) + + Testing.Pipeline.execute_actions(pipeline, remove_children: :source) + refute_pipeline_notified(pipeline, :sink, {:pad_removed, _pad}) + + Testing.Pipeline.execute_actions(pipeline, remove_child_pad: {:sink, Pad.ref(:input, 1)}) + assert_pipeline_notified(pipeline, :sink, {:pad_removed, Pad.ref(:input, 1)}) + + Testing.Pipeline.terminate(pipeline) + end + + test "when `sink` side has a static pad" do + pipeline = + Testing.Pipeline.start_supervised!( + spec: + child(:source, DynamicElement) + |> via_out(Pad.ref(:output, 1)) + |> via_in(:input, implicit_unlink?: false) + |> child(:sink, StaticSink) + ) + + assert_pipeline_notified(pipeline, :source, :playing) + + sink_pid = Testing.Pipeline.get_child_pid!(pipeline, :sink) + sink_monitor = Process.monitor(sink_pid) + + Testing.Pipeline.execute_actions(pipeline, remove_child_pad: {:source, Pad.ref(:output, 1)}) + refute_receive {:DOWN, ^sink_monitor, :process, ^sink_pid, _reason} + + pipeline_monitor = Process.monitor(pipeline) + Testing.Pipeline.execute_actions(pipeline, remove_child_pad: {:sink, :input}) + + assert_receive {:DOWN, ^pipeline_monitor, :process, ^pipeline, {:shutdown, :child_crash}} + end + end + + test "cutting link between 2 bins with dynamic pads" do + pipeline = + Testing.Pipeline.start_link_supervised!( + spec: + child(:source_bin, DynamicBin) + |> via_in(Pad.ref(:input, 1), implicit_unlink?: false) + |> child(:sink_bin, DynamicBin) + ) + + assert_pipeline_notified(pipeline, :source_bin, :playing) + assert_pipeline_notified(pipeline, :source_bin, {:child_notification, :element, :playing}) + assert_pipeline_notified(pipeline, :sink_bin, :playing) + assert_pipeline_notified(pipeline, :sink_bin, {:child_notification, :element, :playing}) + + Testing.Pipeline.execute_actions(pipeline, remove_children: :source_bin) + refute_pipeline_notified(pipeline, :sink_bin, {:pad_removed, _pad}) + + refute_pipeline_notified( + pipeline, + :sink_bin, + {:child_notification, :element, {:pad_removed, _pad}} + ) + + Testing.Pipeline.execute_actions(pipeline, remove_child_pad: {:sink_bin, Pad.ref(:input, 1)}) + assert_pipeline_notified(pipeline, :sink_bin, {:pad_removed, Pad.ref(:input, 1)}) + + assert_pipeline_notified( + pipeline, + :sink_bin, + {:child_notification, :element, {:pad_removed, Pad.ref(:input, _id)}} + ) + + Testing.Pipeline.terminate(pipeline) + end + + describe "cutting link between bin's child and bin's pad" do + test "triggered by unlinking bin" do + pipeline = + Testing.Pipeline.start_link_supervised!( + spec: + child(:element, DynamicElement) + |> via_out(Pad.ref(:output, 1)) + |> via_in(Pad.ref(:input, 1)) + |> child(:bin, NestedBin) + ) + + assert_pipeline_notified(pipeline, :element, :playing) + assert_pipeline_notified(pipeline, :bin, :playing) + assert_pipeline_notified(pipeline, :bin, {:child_notification, :inner_bin, :playing}) + + assert_pipeline_notified( + pipeline, + :bin, + {:child_notification, :inner_bin, {:child_notification, :element, :playing}} + ) + + Testing.Pipeline.execute_actions(pipeline, remove_child_pad: {:element, Pad.ref(:output, 1)}) + + assert_pipeline_notified(pipeline, :bin, {:pad_removed, Pad.ref(:input, 1)}) + + refute_pipeline_notified( + pipeline, + :bin, + {:child_notification, :inner_bin, {:pad_removed, _pad}} + ) + + refute_pipeline_notified( + pipeline, + :bin, + {:child_notification, :inner_bin, {:child_notification, :element, {:pad_removed, _pad}}} + ) + + Testing.Pipeline.execute_actions(pipeline, remove_child_pad: {:bin, Pad.ref(:input, 1)}) + + assert_pipeline_notified( + pipeline, + :bin, + {:child_notification, :inner_bin, {:pad_removed, Pad.ref(:input, _id)}} + ) + + assert_pipeline_notified( + pipeline, + :bin, + {:child_notification, :inner_bin, + {:child_notification, :element, {:pad_removed, Pad.ref(:input, _id)}}} + ) + end + + @tag :skip + test "triggered by pipeline removing bin's pad" do + test "triggered by unlinking bin" do + pipeline = + Testing.Pipeline.start_link_supervised!( + spec: + child(:element, DynamicElement) + |> via_out(Pad.ref(:output, 1)) + |> via_in(Pad.ref(:input, 1)) + |> child(:bin, NestedBin) + ) + + assert_pipeline_notified(pipeline, :element, :playing) + assert_pipeline_notified(pipeline, :bin, :playing) + assert_pipeline_notified(pipeline, :bin, {:child_notification, :inner_bin, :playing}) + + assert_pipeline_notified( + pipeline, + :bin, + {:child_notification, :inner_bin, {:child_notification, :element, :playing}} + ) + + Testing.Pipeline.execute_actions(pipeline, remove_child_pad: {:bin, Pad.ref(:input, 1)}) + + assert_pipeline_notified(pipeline, :bin, {:pad_removed, Pad.ref(:input, 1)}) + + refute_pipeline_notified( + pipeline, + :bin, + {:child_notification, :inner_bin, {:pad_removed, _pad}} + ) + + refute_pipeline_notified( + pipeline, + :bin, + {:child_notification, :inner_bin, {:child_notification, :element, {:pad_removed, _pad}}} + ) + + Testing.Pipeline.execute_actions(pipeline, remove_child_pad: {:bin, Pad.ref(:input, 1)}) + + assert_pipeline_notified( + pipeline, + :bin, + {:child_notification, :inner_bin, {:pad_removed, Pad.ref(:input, _id)}} + ) + + assert_pipeline_notified( + pipeline, + :bin, + {:child_notification, :inner_bin, + {:child_notification, :element, {:pad_removed, Pad.ref(:input, _id)}}} + ) + end + + end + end +end diff --git a/test/membrane/integration/linking_test.exs b/test/membrane/integration/linking_test.exs index 9c59fda24..d404d04a3 100644 --- a/test/membrane/integration/linking_test.exs +++ b/test/membrane/integration/linking_test.exs @@ -472,7 +472,7 @@ defmodule Membrane.Integration.LinkingTest do assert_start_of_stream(pipeline, :sink) end - test "Parent successfully unlinks children with dynamic pads using :remove_link action" do + test "Parent successfully unlinks children with dynamic pads using :remove_child_pad action" do spec = [ child(:source, __MODULE__.Element), @@ -490,8 +490,8 @@ defmodule Membrane.Integration.LinkingTest do for pad_id <- 1..10 do actions = if rem(pad_id, 2) == 0, - do: [remove_link: {:source, Pad.ref(:output, pad_id)}], - else: [remove_link: {:sink, Pad.ref(:input, pad_id)}] + do: [remove_child_pad: {:source, Pad.ref(:output, pad_id)}], + else: [remove_child_pad: {:sink, Pad.ref(:input, pad_id)}] Testing.Pipeline.execute_actions(pipeline, actions) @@ -503,6 +503,70 @@ defmodule Membrane.Integration.LinkingTest do end end + describe "Spec shouldn't wait on links with" do + defmodule LazyBin do + use Membrane.Bin + + def_output_pad :output, + availability: :on_request, + accepted_format: _any + + @impl true + def handle_playing(_ctx, state) do + {[notify_parent: :playing], state} + end + end + + defmodule Sink do + use Membrane.Sink + + def_input_pad :input, + accepted_format: _any + + @impl true + def handle_init(_ctx, _opts) do + {[notify_parent: :init], %{}} + end + end + + test "removed child" do + spec = child(:bin, LazyBin) |> child(:sink, Sink) + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + assert_pipeline_notified(pipeline, :sink, :init) + sink_pid = Testing.Pipeline.get_child_pid!(pipeline, :sink) + monitor_ref = Process.monitor(sink_pid) + + Testing.Pipeline.execute_actions(pipeline, remove_children: :sink) + + assert_receive {:DOWN, ^monitor_ref, :process, ^sink_pid, :normal} + assert_pipeline_notified(pipeline, :bin, :playing) + + assert :ok == Testing.Pipeline.terminate(pipeline, blocking?: true) + end + + test "crashed child" do + sink_ref = Child.ref(:sink, group: :group) + + spec = [ + {child(:sink, Sink), group: :group, crash_group_mode: :temporary}, + child(:bin, LazyBin) |> get_child(sink_ref) + ] + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + assert_pipeline_notified(pipeline, sink_ref, :init) + + Testing.Pipeline.get_child_pid!(pipeline, sink_ref) + |> Process.exit(:kill) + + assert_pipeline_crash_group_down(pipeline, :group) + assert_pipeline_notified(pipeline, :bin, :playing) + + assert :ok == Testing.Pipeline.terminate(pipeline, blocking?: true) + end + end + defp assert_link_removed(pipeline, id) do assert_pipeline_notified(pipeline, :source, {:handle_pad_removed, Pad.ref(:output, ^id)}) assert_pipeline_notified(pipeline, :sink, {:handle_pad_removed, Pad.ref(:input, ^id)}) diff --git a/test/membrane/testing/pipeline_test.exs b/test/membrane/testing/pipeline_test.exs index 7b2c92307..42b10b16f 100644 --- a/test/membrane/testing/pipeline_test.exs +++ b/test/membrane/testing/pipeline_test.exs @@ -22,10 +22,21 @@ defmodule Membrane.Testing.PipelineTest do test "works with :default implementation" do elements = [elem: Elem, elem2: Elem] links = [get_child(:elem) |> get_child(:elem2)] - options = [module: :default, spec: elements ++ links, test_process: nil] + + options = [ + module: :default, + spec: elements ++ links, + test_process: nil, + raise_on_child_pad_removed?: false + ] + assert {[spec: spec], state} = Pipeline.handle_init(%{}, options) - assert state == %Pipeline.State{module: nil, test_process: nil} + assert state == %Pipeline.State{ + module: nil, + test_process: nil, + raise_on_child_pad_removed?: false + } assert spec == elements ++ links end @@ -34,7 +45,12 @@ defmodule Membrane.Testing.PipelineTest do links = [child(:elem, Elem) |> child(:elem2, Elem)] options = [module: :default, spec: links, test_process: nil] assert {[spec: spec], state} = Pipeline.handle_init(%{}, options) - assert state == %Pipeline.State{module: nil, test_process: nil} + + assert state == %Pipeline.State{ + module: nil, + test_process: nil, + raise_on_child_pad_removed?: true + } assert spec == links end @@ -47,7 +63,8 @@ defmodule Membrane.Testing.PipelineTest do assert state == %Pipeline.State{ custom_pipeline_state: :state, module: MockPipeline, - test_process: nil + test_process: nil, + raise_on_child_pad_removed?: nil } end end @@ -57,7 +74,12 @@ defmodule Membrane.Testing.PipelineTest do links = [child(:elem, Elem) |> child(:elem2, Elem)] options = [module: :default, spec: links, test_process: nil] assert {[spec: spec], state} = Pipeline.handle_init(%{}, options) - assert state == %Pipeline.State{module: nil, test_process: nil} + + assert state == %Pipeline.State{ + module: nil, + test_process: nil, + raise_on_child_pad_removed?: true + } assert spec == links end @@ -85,7 +107,7 @@ defmodule Membrane.Testing.PipelineTest do end end - defmodule Bin do + defmodule TripleElementBin do use Membrane.Bin @impl true @@ -116,9 +138,9 @@ defmodule Membrane.Testing.PipelineTest do end spec = [ - child(:bin_1, Bin), - child(:bin_2, Bin), - child(:bin_3, Bin) + child(:bin_1, TripleElementBin), + child(:bin_2, TripleElementBin), + child(:bin_3, TripleElementBin) ] pipeline = Pipeline.start_supervised!(spec: spec) @@ -169,4 +191,80 @@ defmodule Membrane.Testing.PipelineTest do assert {:error, :pipeline_not_alive} = Pipeline.get_child_pid(pipeline, :bin_1) end + + describe "Testing.Pipeline on handle_child_pad_removed" do + defmodule DynamicElement do + use Membrane.Endpoint + + def_input_pad :input, + accepted_format: _any, + availability: :on_request, + flow_control: :push + + def_output_pad :output, + accepted_format: _any, + availability: :on_request, + flow_control: :push + + @impl true + def handle_pad_added(pad, _ctx, state) do + {[notify_parent: {:pad_added, pad}], state} + end + end + + defmodule Bin do + use Membrane.Bin + + alias Membrane.Testing.PipelineTest.DynamicElement + + require Membrane.Pad, as: Pad + + def_output_pad :output, + accepted_format: _any, + availability: :on_request + + @impl true + def handle_pad_added(pad, _ctx, state) do + spec = + child(:element, DynamicElement) + |> via_out(Pad.ref(:output, 1)) + |> bin_output(pad) + + {[spec: spec], state} + end + + @impl true + def handle_parent_notification(:remove_child_pad, _ctx, state) do + {[remove_child_pad: {:element, Pad.ref(:output, 1)}], state} + end + end + + test "raises with option `:raise_on_child_pad_removed?` set to default" do + spec = + child(:bin, Bin) + |> child(:sink, DynamicElement) + + pipeline = Pipeline.start_supervised!(spec: spec) + monitor_ref = Process.monitor(pipeline) + + assert_pipeline_notified(pipeline, :sink, {:pad_added, _pad}) + Pipeline.execute_actions(pipeline, notify_child: {:bin, :remove_child_pad}) + + assert_receive {:DOWN, ^monitor_ref, :process, _pid, _reason} + end + + test "doesn't raise with option `raise_on_child_pad_removed?: false`" do + spec = + child(:bin, Bin) + |> child(:sink, DynamicElement) + + pipeline = Pipeline.start_supervised!(spec: spec, raise_on_child_pad_removed?: false) + monitor_ref = Process.monitor(pipeline) + + assert_pipeline_notified(pipeline, :sink, {:pad_added, _pad}) + Pipeline.execute_actions(pipeline, notify_child: {:bin, :remove_child_pad}) + + refute_receive {:DOWN, ^monitor_ref, :process, _pid, _reason} + end + end end