EventBroker (Anoma v0.29.0)

I am the EventBroker Application Module.

I startup the PubSub system as an own OTP application. Moreover I provide all the API necessary for the use of the system. I contain all public functionality provided by the Broker and the Registry.

Public API

I have the following public functionality:

Summary

Types

I am a filter dependency specification, I am a list of filter specs listed in the order in which the filter agents implementing said specs should be subscribed to one another.

Functions

I am the Event Broker event function.

I am a subscription function specifically for self()

I am the unsubscription function specifically for self()

Types

Link to this type

filter_spec_list()

@type filter_spec_list() :: [struct()]

I am a filter dependency specification, I am a list of filter specs listed in the order in which the filter agents implementing said specs should be subscribed to one another.

Functions

Link to this function

event(event, broker \\ EventBroker.Broker)

@spec event(EventBroker.Event.t(), atom()) :: :ok

I am the Event Broker event function.

I process any incoming events by sending them to all of Broker subscribers using the send/2 functionality.

Link to this function

subscribe(pid, filter_spec_list, registry \\ EventBroker.Registry)

@spec subscribe(pid(), filter_spec_list(), atom()) :: :ok | String.t()

I am the subscription function.

Given a PID and a list of filter specs, I ensure that all the actors corresponding to each spec given in the list is launched and subscribed to each other in the appropriate order - e.g. given [spec1, spec2] I ensure that agents implemented spec1 and spec2 are launched and that the former is subscribed to the top broker, while the latter is subscribed to the former.

I also do this in a minimal fashion, that is, if some starting subchain of filter spec dependency has already been registered, I launch the minimal chain remaining to build the full dependency.

Note that each filter actor is hence not only determined by the filtering functionality it has but also on the chain of dependencies it is spawned with. Hence [spec1, spec2] corresponds to an agent different from [spec2].

Afterwards, I subscribe the given PID to the final filter agent in the dependency chain and register all the new agents in the map by putting the filter agent PIDs as values to their dependency filter specification.

If I notice that any of the filter structures do not have an appropriate public filter functionality exposed, I return the list of such modules back to the user and do nothing with respect to agent-spawning or registration.

Filter Agent spawning is handled via DynamicSupervisor.

Link to this function

subscribe_me(filter_spec_list, registry \\ EventBroker.Registry)

@spec subscribe_me(filter_spec_list(), atom()) :: :ok | String.t()

I am a subscription function specifically for self()

I call subscribe/2 where the first argument is self()

Link to this function

unsubscribe(pid, filter_spec_list, registry \\ EventBroker.Registry)

@spec unsubscribe(pid(), filter_spec_list(), atom()) :: :ok

I am the unsubscription function.

Given a PID and a list of filter specs, I get the PID of the related filter agent by looking in my state, then ask it to unsubscribe the given PID from it. In case the agent has other subscribers, I return the base state.

Otherwise, it will shut down, prompting to recursively send unsubscription requests to its parent filters and processing their termination appropriately in a synchronous manner.

After all the requests have been sent and terminations recorded, I remove all agents which have shut down from my registry map and return :ok

Link to this function

unsubscribe_me(filter_spec_list, registry \\ EventBroker.Registry)

@spec unsubscribe_me(filter_spec_list(), atom()) :: :ok

I am the unsubscription function specifically for self()

I call unsubscribe/2 where the first argument is self()