%%%----------------------------------------------------------------------------- %%% @copyright (C) 2011-2019, 2600Hz %%% @doc Behaviour for setting up an AMQP listener. %%% Add/remove responders for `Event-Cat'/`Event-Name' pairs. Each responder %%% corresponds to a module that has defined a `handle/1' function, receiving %%% the {@link kz_json:object()} from the AMQP request. %%% %%% Options: %%%
%%%
`{bindings, [{atom(), kz_term:proplist()}, ...]}'
%%%
The type of bindings, with optional properties to pass along.
%%% %%%
`{responders, [{Responder, [{<<"event-category">>, <<"event-name">>}, ...]}]'
%%%
`Responder' is the module name to call `handle_req/2' on for those category/name combos. `Responder' can also be `{module, function}', %%% to call `module:function/2' instead of `handle_req/2'. %%% `Responder' can optionally define a `function/3' (or `handle_req/3') that will be called with the 3rd argument %%% consisting of the delivery options including exchange and routing key. %%%
%%% %%%
`{queue_name, <<"some name">>}'
%%%
Optional, if you want a named queue.
%%% %%%
`{queue_options, [{key, value}]}'
%%%
Optional, if the queue requires different parameters.
%%% %%%
`{consume_options, [{key, value}]}'
%%%
Optional, if the consumption requires special parameters.
%%% %%%
`{basic_qos, integer()}'
%%%
Optional, if QoS is being set on this queue.
%%% %%%
`{broker | broker_tag, kz_term:ne_binary()}'
%%%
Optional, for binding to specific brokers.
%%% %%%
`{declare_exchanges, declare_exchanges()}'
%%%
Optional, for declaring dynamic exchanges used only in this connection.
%%%
%%% %%% @author James Aimonetti %%% @author Karl Anderson %%% @end %%%----------------------------------------------------------------------------- -module(gen_listener). -behaviour(gen_server). -export([start_link/3 ,start_link/4 ,start_link/5 ]). -export([start_listener/2]). -export([queue_name/1 ,bindings/1 ,responders/1 ,is_consuming/1 ,routing_key_used/1 ]). -export([add_queue/4 ,other_queues/1 ,rm_queue/2 ]). %% gen_server callbacks -export([init/1 ,handle_call/3 ,handle_cast/2 ,handle_info/2 ,terminate/2 ,code_change/3 ,format_status/2 ]). %% gen_server API -export([call/2 ,call/3 ,cast/2 ,reply/2 ,enter_loop/3, enter_loop/4, enter_loop/5 ]). %% gen_listener API -export([add_responder/3 ,rm_responder/2 ,rm_responder/3 ]). -export([add_binding/2, add_binding/3 ,b_add_binding/2, b_add_binding/3 ,rm_binding/2, rm_binding/3 ]). -export([ack/2 ,nack/2 ]). -export([execute/4 ,execute/3 ,execute/2 ]). -export([federated_event/4]). -export([delayed_cast/3 ]). -export([distribute_event/3]). -include("listener_types.hrl"). -define(SERVER, ?MODULE). -define(SERVER_RETRY_PERIOD, 30 * ?MILLISECONDS_IN_SECOND). -define(TIMEOUT_RETRY_CONN, 5 * ?MILLISECONDS_IN_SECOND). -define(CALLBACK_TIMEOUT_MSG, 'callback_timeout'). -define(BIND_WAIT, 100). -type module_state() :: any(). -type federator_listener() :: {kz_term:ne_binary(), pid()}. -type federator_listeners() :: [federator_listener()]. -record(state, {queue :: kz_term:api_binary() ,is_consuming = 'false' :: boolean() ,responders = [] :: listener_utils:responders() %% {{EvtCat, EvtName}, Module} ,bindings = [] :: bindings() %% {authentication, [{key, value},...]} ,params = [] :: kz_term:proplist() ,module :: atom() ,module_state :: module_state() ,module_timeout_ref :: kz_term:api_reference() % when the client sets a timeout, gen_listener calls shouldn't negate it, only calls that pass through to the client ,other_queues = [] :: [{kz_term:ne_binary(), {kz_term:proplist(), kz_term:proplist()}}] %% {QueueName, {kz_term:proplist(), kz_term:proplist()}} ,federators = [] :: federator_listeners() ,waiting_federators = [] :: list() ,self = self() :: pid() ,consumer_key = kz_amqp_channel:consumer_pid() ,consumer_tags = [] :: kz_term:binaries() ,handle_event_mfa = 'undefined' :: mfa() | 'undefined' ,auto_ack = 'false' :: boolean() }). -type state() :: #state{}. -type deliver() :: {basic_deliver(), amqp_basic()}. -type callback_datum() :: {'server', pid()} | {'queue', kz_term:api_binary()} | {'other_queues', kz_term:ne_binaries()}. -type callback_data() :: kz_term:proplist() | [callback_datum()]. -export_type([handle_event_return/0 ,binding/0 ,bindings/0 ,basic_deliver/0 ,callback_data/0 ]). %%%============================================================================= %%% API %%%============================================================================= -callback init(any()) -> {'ok', module_state()} | {'ok', module_state(), timeout() | 'hibernate'} | {'stop', any()} | 'ignore'. -type handle_call_return() :: {'reply', any(), module_state()} | {'reply', any(), module_state(), timeout() | 'hibernate'} | {'noreply', module_state()} | {'noreply', module_state(), timeout() | 'hibernate'} | {'stop', any(), any(), module_state()} | {'stop', any(), module_state()}. -callback handle_call(any(), {pid(), any()}, module_state()) -> handle_call_return(). -type handle_cast_return() :: {'noreply', module_state()} | {'noreply', module_state(), timeout() | 'hibernate'} | {'stop', any(), module_state()}. -callback handle_cast(any(), module_state()) -> handle_cast_return(). -type handle_info_return() :: {'noreply', module_state()} | {'noreply', module_state(), timeout() | 'hibernate'} | {'stop', any(), module_state()}. -callback handle_info(timeout() | any(), module_state()) -> handle_info_return(). -type handle_event_return() :: 'ignore' | {'ignore', module_state()} | {'reply', kz_term:proplist()} | {'reply', kz_term:proplist(), module_state()}. -callback handle_event(kz_json:object(), module_state()) -> handle_event_return(). -callback handle_event(kz_json:object(), basic_deliver(), module_state()) -> handle_event_return(). -callback handle_event(kz_json:object(), basic_deliver(), amqp_basic(), module_state()) -> handle_event_return(). -callback terminate('normal' | 'shutdown' | {'shutdown', any()} | any(), module_state()) -> any(). -callback code_change(any() | {'down', any()}, module_state(), any()) -> {'ok', module_state()} | {'error', any()}. -optional_callbacks([handle_event/2, handle_event/3, handle_event/4]). %%------------------------------------------------------------------------------ %% @doc %% @end %%------------------------------------------------------------------------------ -spec start_link(atom(), start_params(), list()) -> kz_types:startlink_ret(). start_link(Module, Params, InitArgs) when is_atom(Module), is_list(Params), is_list(InitArgs) -> gen_server:start_link(?SERVER, [Module, Params, InitArgs], []). -spec start_link(kz_types:gen_server_name() | atom(), atom() | start_params(), start_params() | list(), kz_types:gen_server_options() | list()) -> kz_types:startlink_ret(). start_link(Module, Params, InitArgs, Options) when is_atom(Module), is_list(Params), is_list(InitArgs), is_list(Options) -> gen_server:start_link(?MODULE, [Module, Params, InitArgs], Options); start_link(Name, Module, Params, InitArgs) when is_atom(Module), is_list(Params), is_list(InitArgs) -> gen_server:start_link(Name, ?MODULE, [Module, Params, InitArgs], []). -spec start_link(kz_types:gen_server_name(), atom(), start_params(), list(), kz_types:gen_server_options()) -> kz_types:startlink_ret(). start_link(Name, Module, Params, InitArgs, Options) when is_atom(Module), is_list(Params), is_list(InitArgs), is_list(Options) -> gen_server:start_link(Name, ?MODULE, [Module, Params, InitArgs], Options). -spec queue_name(kz_types:server_ref()) -> kz_term:api_ne_binary(). queue_name(Srv) -> gen_server:call(Srv, 'queue_name'). -spec is_consuming(kz_types:server_ref()) -> boolean(). is_consuming(Srv) -> gen_server:call(Srv, 'is_consuming'). -spec responders(kz_types:server_ref()) -> listener_utils:responders(). responders(Srv) -> gen_server:call(Srv, 'responders'). -spec bindings(kz_types:server_ref()) -> bindings(). bindings(Srv) -> gen_server:call(Srv, 'bindings'). -spec routing_key_used(basic_deliver()) -> kz_term:ne_binary(). routing_key_used(#'basic.deliver'{routing_key=RoutingKey}) -> kz_term:to_binary(RoutingKey). -spec ack(kz_types:server_ref(), basic_deliver()) -> 'ok'. ack(Srv, Delivery) -> gen_server:cast(Srv, {'ack', Delivery}). -spec nack(kz_types:server_ref(), basic_deliver()) -> 'ok'. nack(Srv, Delivery) -> gen_server:cast(Srv, {'nack', Delivery}). %%------------------------------------------------------------------------------ %% @doc API functions that mirror `gen_server:call,cast,reply'. %% @end %%------------------------------------------------------------------------------ -spec call(kz_types:server_ref(), any()) -> any(). call(Name, Request) -> gen_server:call(Name, {'$client_call', Request}). -spec call(kz_types:server_ref(), any(), timeout()) -> any(). call(Name, Request, Timeout) -> gen_server:call(Name, {'$client_call', Request}, Timeout). -spec cast(kz_types:server_ref(), any()) -> 'ok'. cast(Name, Request) -> gen_server:cast(Name, {'$client_cast', Request}). -spec delayed_cast(kz_types:server_ref(), any(), pos_integer()) -> 'ok'. delayed_cast(Name, Request, Wait) when is_integer(Wait), Wait > 0 -> _P = kz_util:spawn( fun() -> kz_util:put_callid(?MODULE), timer:sleep(Wait), gen_server:cast(Name, Request) end), 'ok'. -spec reply(kz_term:pid_ref(), any()) -> no_return(). reply(From, Msg) -> gen_server:reply(From, Msg). -type server_name() :: {'global' | 'local', atom()} | pid(). -spec enter_loop(atom(), list(), any()) -> no_return(). enter_loop(Module, Options, ModuleState) -> enter_loop(Module, Options, ModuleState, self(), 'infinity'). -spec enter_loop(atom(), list(), any(), timeout() | server_name()) -> no_return(). enter_loop(Module, Options, ModuleState, {Scope, _Name}=ServerName) when Scope =:= 'local' orelse Scope =:= 'global' -> enter_loop(Module, Options, ModuleState, ServerName, 'infinity'); enter_loop(Module, Option, ModuleState, Timeout) -> enter_loop(Module, Option, ModuleState, self(), Timeout). -spec enter_loop(atom(), list(), any(), server_name(), timeout()) -> no_return(). enter_loop(Module, Options, ModuleState, ServerName, Timeout) -> {'ok', MyState} = init_state([Module, Options, ModuleState]), gen_server:enter_loop(?MODULE, [], MyState, ServerName, Timeout). -spec add_responder(kz_types:server_ref(), responder_callback(), responder_callback_mapping() | responder_callback_mappings()) -> 'ok'. add_responder(Srv, Responder, {_,_}=Key) -> add_responder(Srv, Responder, [Key]); add_responder(Srv, Responder, [{_,_}|_] = Keys) -> gen_server:cast(Srv, {'add_responder', Responder, Keys}). %%------------------------------------------------------------------------------ %% @doc Removes responder from queue. %% Empty list removes all. %% @end %%------------------------------------------------------------------------------ -spec rm_responder(kz_types:server_ref(), responder_callback()) -> 'ok'. rm_responder(Srv, Responder) -> rm_responder(Srv, Responder, []). -spec rm_responder(kz_types:server_ref(), responder_callback(), responder_callback_mappings()) -> 'ok'. rm_responder(Srv, Responder, {_,_}=Key) -> rm_responder(Srv, Responder, [Key]); rm_responder(Srv, Responder, Keys) -> gen_server:cast(Srv, {'rm_responder', Responder, Keys}). -spec add_binding(kz_types:server_ref(), binding() | kz_term:ne_binary() | atom()) -> 'ok'. add_binding(Srv, {Binding, Props}) when is_list(Props) andalso (is_atom(Binding) orelse is_binary(Binding) ) -> gen_server:cast(Srv, {'add_binding', kz_term:to_binary(Binding), Props}); add_binding(Srv, Binding) when is_binary(Binding) orelse is_atom(Binding) -> gen_server:cast(Srv, {'add_binding', kz_term:to_binary(Binding), []}). -spec add_binding(kz_types:server_ref(), kz_term:ne_binary() | atom(), kz_term:proplist()) -> 'ok'. add_binding(Srv, Binding, Props) when is_binary(Binding) orelse is_atom(Binding) -> gen_server:cast(Srv, {'add_binding', kz_term:to_binary(Binding), Props}). -spec b_add_binding(kz_types:server_ref(), binding() | kz_term:ne_binary() | atom()) -> 'ok'. b_add_binding(Srv, {Binding, Props}) when is_list(Props) ,(is_atom(Binding) orelse is_binary(Binding) ) -> gen_server:call(Srv, {'add_binding', kz_term:to_binary(Binding), Props}); b_add_binding(Srv, Binding) when is_binary(Binding) orelse is_atom(Binding) -> gen_server:call(Srv, {'add_binding', kz_term:to_binary(Binding), []}). -spec b_add_binding(kz_types:server_ref(), kz_term:ne_binary() | atom(), kz_term:proplist()) -> 'ok'. b_add_binding(Srv, Binding, Props) when is_binary(Binding) orelse is_atom(Binding) -> gen_server:call(Srv, {'add_binding', kz_term:to_binary(Binding), Props}). %%------------------------------------------------------------------------------ %% @doc Add responder to a queue. %% It is expected that responders have been set up already, prior to %% binding the new queue. %% @end %%------------------------------------------------------------------------------ -spec add_queue(kz_types:server_ref(), binary(), kz_term:proplist(), binding() | bindings()) -> {'ok', kz_term:ne_binary()} | {'error', any()}. add_queue(Srv, QueueName, QueueProps, {_Type, _Props}=Binding) -> add_queue(Srv, QueueName, QueueProps, [Binding]); add_queue(Srv, QueueName, QueueProps, [{_,_}|_]=Bindings) -> gen_server:call(Srv, {'add_queue', QueueName, QueueProps, Bindings}). -spec rm_queue(kz_types:server_ref(), kz_term:ne_binary()) -> 'ok'. rm_queue(Srv, ?NE_BINARY = QueueName) -> gen_server:cast(Srv, {'rm_queue', QueueName}). -spec other_queues(kz_types:server_ref()) -> kz_term:ne_binaries(). other_queues(Srv) -> gen_server:call(Srv, 'other_queues'). -spec rm_binding(kz_types:server_ref(), binding()) -> 'ok'. rm_binding(Srv, {Binding, Props}) -> rm_binding(Srv, Binding, Props). -spec rm_binding(kz_types:server_ref(), kz_term:ne_binary() | atom(), kz_term:proplist()) -> 'ok'. rm_binding(Srv, Binding, Props) -> gen_server:cast(Srv, {'rm_binding', kz_term:to_binary(Binding), Props}). -spec federated_event(kz_types:server_ref(), kz_json:object(), basic_deliver(), amqp_basic()) -> 'ok'. federated_event(Srv, JObj, BasicDeliver, BasicData) -> gen_server:cast(Srv, {'federated_event', JObj, BasicDeliver, BasicData}). -spec execute(kz_types:server_ref(), module(), atom(), [any()]) -> 'ok'. execute(Srv, Module, Function, Args) -> gen_server:cast(Srv, {'$execute', Module, Function, Args}). -spec execute(kz_types:server_ref(), atom(), [any()]) -> 'ok'. execute(Srv, Function, Args) -> gen_server:cast(Srv, {'$execute', Function, Args}). -spec execute(kz_types:server_ref(), function()) -> 'ok'. execute(Srv, Function) when is_function(Function) -> gen_server:cast(Srv, {'$execute', Function}). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= %%------------------------------------------------------------------------------ %% @doc Takes an existing process and turns it into a `gen_listener'. %% @end %%------------------------------------------------------------------------------ -spec init_state(list()) -> {'ok', state()} | {'stop', any()} | 'ignore'. init_state([Module, Params, ModuleState]) -> process_flag('trap_exit', 'true'), put('callid', Module), lager:debug("continuing as a gen_listener proc : ~s", [Module]), init(Module, Params, ModuleState, 'undefined'). -spec init([atom() | kz_term:proplist(),...]) -> {'ok', state()} | {'stop', any()} | 'ignore'. init([Module, Params, InitArgs]) -> process_flag('trap_exit', 'true'), put('callid', Module), lager:debug("starting new gen_listener proc : ~s", [Module]), case erlang:function_exported(Module, 'init', 1) andalso Module:init(InitArgs) of {'ok', MS} -> init(Module, Params, MS, 'undefined'); {'ok', MS, 'hibernate'} -> init(Module, Params, MS, 'undefined'); {'ok', MS, Timeout} -> init(Module, Params, MS, start_timer(Timeout)); {'stop', _R} = STOP -> STOP; 'ignore' -> 'ignore' end. -spec init(atom(), kz_term:proplist(), module_state(), kz_term:api_reference()) -> {'ok', state()}. init(Module, Params, ModuleState, TimeoutRef) -> _ = channel_requisition(Params), _ = [add_responder(self(), Mod, Events) || {Mod, Events} <- props:get_value('responders', Params, []) ], {'ok', #state{module=Module ,module_state=ModuleState ,module_timeout_ref=TimeoutRef ,params=Params ,handle_event_mfa = listener_utils:responder_mfa(Module, 'handle_event') ,auto_ack = props:is_true('auto_ack', Params, 'false') }}. %%------------------------------------------------------------------------------ %% @doc Handling call messages. %% @end %%------------------------------------------------------------------------------ -spec handle_call(any(), kz_term:pid_ref(), state()) -> handle_call_return(). handle_call({'add_queue', QueueName, QueueProps, Bindings}, _From, State) -> {Q, S} = add_other_queue(QueueName, QueueProps, Bindings, State), {'reply', {'ok', Q}, S}; handle_call('other_queues', _From, #state{other_queues=OtherQueues}=State) -> {'reply', props:get_keys(OtherQueues), State}; handle_call('queue_name', _From, #state{queue=Q}=State) -> {'reply', Q, State}; handle_call('responders', _From, #state{responders=Rs}=State) -> {'reply', Rs, State}; handle_call('bindings', _From, #state{bindings=Bs}=State) -> {'reply', Bs, State}; handle_call('is_consuming', _From, #state{is_consuming=IsC}=State) -> {'reply', IsC, State}; handle_call({'$client_call', Request}, From, State) -> handle_module_call(Request, From, State); handle_call({'add_binding', _Binding, _Props}=AddBinding, _From, State) -> case handle_cast(AddBinding, State) of {'noreply', State1} -> {'reply', 'ok', State1}; {'stop', _Reason, _State1}=Stop -> Stop end; handle_call(Request, From, State) -> handle_module_call(Request, From, State). %%------------------------------------------------------------------------------ %% @doc Handling cast messages. %% @end %%------------------------------------------------------------------------------ -spec handle_cast(any(), state()) -> handle_cast_return(). handle_cast({'ack', Delivery}, State) -> _A = (catch kz_amqp_util:basic_ack(Delivery)), {'noreply', State}; handle_cast({'nack', Delivery}, State) -> _N = (catch kz_amqp_util:basic_nack(Delivery)), {'noreply', State}; handle_cast({'add_queue', QueueName, QueueProps, Bindings}, State) -> {_, S} = add_other_queue(QueueName, QueueProps, Bindings, State), {'noreply', S}; handle_cast({'rm_queue', QueueName}, #state{other_queues=OtherQueues}=State) -> _ = [remove_binding(Binding, Props, QueueName) || {Binding, Props} <- props:get_value(QueueName, OtherQueues, []) ], {'noreply', State#state{other_queues=props:delete(QueueName, OtherQueues)}}; handle_cast({'add_responder', Responder, Keys}, #state{responders=Responders}=State) -> {'noreply' ,State#state{responders=listener_utils:add_responder(Responders, Responder, Keys)} }; handle_cast({'rm_responder', Responder, Keys}, #state{responders=Responders}=State) -> {'noreply' ,State#state{responders=listener_utils:rm_responder(Responders, Responder, Keys)} }; handle_cast({'add_binding', _, _}=AddBinding, #state{is_consuming='false'}=State) -> %% wait 100 + [100,200) ms before replaying the binding request Time = ?BIND_WAIT + 100 + rand:uniform(100), lager:debug("not consuming yet, put binding to end of message queue after ~b ms", [Time]), delayed_cast(self(), AddBinding, Time), {'noreply', State}; handle_cast({'add_binding', Binding, Props}, State) -> {'noreply', handle_add_binding(Binding, Props, State)}; handle_cast({'rm_binding', Binding, Props}, State) -> {'noreply', handle_rm_binding(Binding, Props, State)}; handle_cast({'kz_amqp_assignment', {'new_channel', 'true', Channel}}, State) -> lager:debug("channel reconnecting"), _ = kz_amqp_channel:consumer_channel(Channel), {'noreply', State}; handle_cast({'kz_amqp_assignment', {'new_channel', 'false', Channel}}, State) -> _ = kz_amqp_channel:consumer_channel(Channel), {'noreply', handle_amqp_channel_available(State)}; handle_cast({'federated_event', JObj, BasicDeliver, BasicData}, #state{params=Params}=State) -> case props:is_true('spawn_handle_event', Params, 'false') of 'true' -> kz_util:spawn(fun distribute_event/3, [JObj, {BasicDeliver, BasicData}, State]), {'noreply', State}; 'false' -> {'noreply', distribute_event(JObj, {BasicDeliver, BasicData}, State)} end; handle_cast({'$execute', Module, Function, Args} ,#state{federators=[]}=State) -> erlang:apply(Module, Function, Args), {'noreply', State}; handle_cast({'$execute', Function, Args} ,#state{federators=[]}=State) -> erlang:apply(Function, Args), {'noreply', State}; handle_cast({'$execute', Function} ,#state{federators=[]}=State) -> Function(), {'noreply', State}; handle_cast({'$execute', Module, Function, Args}=Msg ,#state{federators=Federators}=State) -> erlang:apply(Module, Function, Args), _ = [?MODULE:cast(Federator, Msg) || {_Broker, Federator} <- Federators ], {'noreply', State}; handle_cast({'$execute', Function, Args}=Msg ,#state{federators=Federators}=State) -> erlang:apply(Function, Args), _ = [?MODULE:cast(Federator, Msg) || {_Broker, Federator} <- Federators ], {'noreply', State}; handle_cast({'$execute', Function}=Msg ,#state{federators=Federators}=State ) -> Function(), _ = [?MODULE:cast(Federator, Msg) || {_Broker, Federator} <- Federators ], {'noreply', State}; handle_cast({'$client_cast', Message}, State) -> handle_module_cast(Message, State); handle_cast({'start_listener', Params}, #state{queue='undefined' ,is_consuming='false' ,responders=[] ,bindings=[] ,params=[] }=State) -> #state{module=Module ,module_state=ModuleState ,module_timeout_ref=TimeoutRef } = State, {'ok', #state{}=N} = init(Module, Params, ModuleState, TimeoutRef), {'noreply', N}; handle_cast({'start_listener', _Params}, State) -> lager:debug("gen listener asked to start listener but it is already initialized"), {'noreply', State}; handle_cast({'pause_consumers'}, #state{is_consuming='true', consumer_tags=Tags}=State) -> lists:foreach(fun kz_amqp_util:basic_cancel/1, Tags), {'noreply', State}; handle_cast({'resume_consumers'}, #state{queue='undefined'}=State) -> {'noreply', State}; handle_cast({'resume_consumers'}, #state{is_consuming='false' ,params=Params ,queue=Q ,other_queues=OtherQueues ,auto_ack=AutoAck }=State) -> ConsumeOptions = props:get_value('consume_options', Params, []), start_consumer(Q, maybe_configure_auto_ack(ConsumeOptions, AutoAck)), _ = [start_consumer(Q1, maybe_configure_auto_ack(props:get_value('consume_options', P, []), AutoAck)) || {Q1, {_, P}} <- OtherQueues ], {'noreply', State}; handle_cast({'federator_is_consuming', Broker, 'true'}, State) -> lager:info("federator for ~p is consuming, waiting on: ~p", [Broker, State#state.waiting_federators]), Filter = fun(X) -> kz_amqp_connections:broker_available_connections(X) > 0 end, Waiting = lists:filter(Filter, State#state.waiting_federators), lager:info("available waiting brokers: ~p", [Waiting]), case lists:subtract(Waiting, [Broker]) of [] -> lager:info("all waiting federators are available!"), handle_module_cast({?MODULE, {'federators_consuming', 'true'}}, State); Remaining -> lager:info("still waiting for federators: ~p", [Remaining]), {'noreply', State#state{waiting_federators = Remaining}} end; handle_cast(Message, State) -> handle_module_cast(Message, State). -spec maybe_remove_binding(binding(), binding_module(), kz_term:proplist(), kz_term:ne_binary()) -> boolean(). maybe_remove_binding({B, P}, B, P, Q) -> lager:debug("removing ~s: ~p", [B, P]), remove_binding(B, P, Q), 'false'; maybe_remove_binding(_BP, _B, _P, _Q) -> 'true'. %%------------------------------------------------------------------------------ %% @doc Handling all non call/cast messages. %% @end %%------------------------------------------------------------------------------ -spec handle_info(any(), state()) -> kz_types:handle_info_ret(). handle_info({#'basic.deliver'{}=BD, #amqp_msg{props=#'P_basic'{content_type=CT}=Basic ,payload=Payload }} ,#state{params=Params, auto_ack=AutoAck}=State) -> _ = case AutoAck of 'true' -> (catch kz_amqp_util:basic_ack(BD)); 'false' -> 'ok' end, case props:is_true('spawn_handle_event', Params, 'false') of 'true' -> kz_util:spawn(fun handle_event/4, [Payload, CT, {BD, Basic}, State]), {'noreply', State}; 'false' -> {'noreply', handle_event(Payload, CT, {BD, Basic}, State)} end; handle_info({#'basic.return'{}=BR, #amqp_msg{props=#'P_basic'{content_type=CT} ,payload=Payload }}, State) -> handle_return(Payload, CT, BR, State); handle_info(#'basic.consume_ok'{consumer_tag=CTag}, #state{queue='undefined'}=State) -> lager:debug("received consume ok (~s) for abandoned queue", [CTag]), {'noreply', State}; handle_info(#'basic.consume_ok'{consumer_tag=CTag}, #state{consumer_tags=CTags}=State) -> gen_server:cast(self(), {?MODULE, {'is_consuming', 'true'}}), {'noreply', State#state{is_consuming='true' ,consumer_tags=[CTag | CTags] }}; handle_info(#'basic.cancel_ok'{consumer_tag=CTag}, #state{consumer_tags=CTags}=State) -> lager:debug("recv a basic.cancel_ok for tag ~s", [CTag]), gen_server:cast(self(), {?MODULE, {'is_consuming', 'false'}}), {'noreply', State#state{is_consuming='false' ,consumer_tags=lists:delete(CTag, CTags) }}; handle_info(#'basic.ack'{}=Ack, #state{}=State) -> lager:debug("recv a basic.ack ~p", [Ack]), handle_confirm(Ack, State); handle_info(#'basic.nack'{}=Nack, #state{}=State) -> lager:debug("recv a basic.nack ~p", [Nack]), handle_confirm(Nack, State); handle_info(#'channel.flow'{active=Active}, State) -> lager:debug("received channel flow (~s)", [Active]), kz_amqp_util:flow_control_reply(Active), gen_server:cast(self(), {?MODULE,{'channel_flow_control', Active}}), {'noreply', State}; handle_info('$is_gen_listener_consuming' ,#state{is_consuming='false' ,bindings=ExistingBindings ,params=Params }=State) -> _Release = (catch kz_amqp_channel:release()), _Requisition = channel_requisition(Params), {'noreply', State#state{queue='undefined' ,bindings=[] ,params=props:set_value('bindings', ExistingBindings, Params) }}; handle_info('$is_gen_listener_consuming', State) -> {'noreply', State}; handle_info({'$server_confirms', ServerConfirms}, State) -> gen_server:cast(self(), {?MODULE,{'server_confirms',ServerConfirms}}), {'noreply', State}; handle_info({'$channel_flow', Active}, State) -> gen_server:cast(self(), {?MODULE,{'channel_flow', Active}}), {'noreply', State}; handle_info(?CALLBACK_TIMEOUT_MSG, State) -> handle_callback_info('timeout', State); handle_info(Message, State) -> handle_callback_info(Message, State). %%------------------------------------------------------------------------------ %% @doc Handles the AMQP messages prior to the spawning a handler. %% Allows listeners to pass options to handlers. %% @end %%------------------------------------------------------------------------------ -spec handle_event(kz_term:ne_binary(), kz_term:ne_binary(), deliver(), state()) -> state(). handle_event(Payload, <<"application/json">>, Deliver, State) -> JObj = kz_json:decode(Payload), _ = kz_util:put_callid(JObj), distribute_event(JObj, Deliver, State); handle_event(Payload, <<"application/erlang">>, Deliver, State) -> JObj = binary_to_term(Payload), _ = kz_util:put_callid(JObj), distribute_event(JObj, Deliver, State). %%------------------------------------------------------------------------------ %% @doc Handles the AMQP messages prior to the spawning a handler. %% Allows listeners to pass options to handlers. %% @end %%------------------------------------------------------------------------------ -spec handle_return(kz_term:ne_binary(), kz_term:ne_binary(), #'basic.return'{}, state()) -> handle_cast_return(). handle_return(Payload, <<"application/json">>, BR, State) -> JObj = kz_json:decode(Payload), _ = kz_util:put_callid(JObj), handle_return(JObj, BR, State); handle_return(Payload, <<"application/erlang">>, BR, State) -> JObj = binary_to_term(Payload), _ = kz_util:put_callid(JObj), handle_return(JObj, BR, State). -spec handle_return(kz_json:object(), #'basic.return'{}, state()) -> handle_cast_return(). handle_return(JObj, BR, State) -> Msg = {?MODULE, {'return', JObj, basic_return_to_jobj(BR)}}, handle_module_cast(Msg, State). -spec basic_return_to_jobj(#'basic.return'{}) -> kz_json:object(). basic_return_to_jobj(#'basic.return'{reply_code=Code ,reply_text=Msg ,exchange=Exchange ,routing_key=RoutingKey }) -> kz_json:from_list([{<<"code">>, Code} ,{<<"message">>, kz_term:to_binary(Msg)} ,{<<"exchange">>, kz_term:to_binary(Exchange)} ,{<<"routing_key">>, kz_term:to_binary(RoutingKey)} ]). -spec handle_confirm(#'basic.ack'{} | #'basic.nack'{}, state()) -> handle_cast_return(). handle_confirm(Confirm, State) -> Msg = {?MODULE, {'confirm', Confirm}}, handle_module_cast(Msg, State). %%------------------------------------------------------------------------------ %% @doc %% @end %%------------------------------------------------------------------------------ -spec terminate(any(), state()) -> 'ok'. terminate(Reason, #state{module=Module ,module_state=ModuleState ,federators=Fs ,consumer_tags=Tags }) -> _ = (catch(lists:foreach(fun kz_amqp_util:basic_cancel/1, Tags))), _ = (catch Module:terminate(Reason, ModuleState)), _ = (catch kz_amqp_channel:release()), _ = [listener_federator:stop(F) || {_Broker, F} <- Fs], lager:debug("~s terminated cleanly, going down", [Module]). %%------------------------------------------------------------------------------ %% @doc Convert process state when code is changed. %% @end %%------------------------------------------------------------------------------ -spec code_change(any(), state(), any()) -> {'ok', state()}. code_change(_OldVersion, State, _Extra) -> {'ok', State}. %%%============================================================================= %%% Internal functions %%%============================================================================= %%------------------------------------------------------------------------------ %% @doc %% @end %%------------------------------------------------------------------------------ -spec handle_callback_info(any(), state()) -> handle_info_return(). handle_callback_info(Message, #state{module=Module ,module_state=ModuleState ,module_timeout_ref=OldRef }=State) -> _ = stop_timer(OldRef), try Module:handle_info(Message, ModuleState) of {'noreply', ModuleState1} -> {'noreply', State#state{module_state=ModuleState1}}; {'noreply', ModuleState1, Timeout} -> Ref = start_timer(Timeout), {'noreply', State#state{module_state=ModuleState1 ,module_timeout_ref=Ref } }; {'stop', Reason, ModuleState1} -> {'stop', Reason, State#state{module_state=ModuleState1}} catch _E:R -> ST = erlang:get_stacktrace(), lager:debug("handle_info exception: ~s: ~p", [_E, R]), kz_util:log_stacktrace(ST), {'stop', R, State} end. -spec format_status('normal' | 'terminate', [kz_term:proplist() | state()]) -> any(). format_status(_Opt, [_PDict, #state{module=Module ,module_state=ModuleState }=State]) -> case erlang:function_exported(Module, 'format_status', 2) of 'true' -> Module:format_status(_Opt, [_PDict, ModuleState]); 'false' -> [{'data', [{"Module State", ModuleState} ,{"Module", Module} ,{"Listener State", State} ] }] end. -spec distribute_event(kz_json:object(), deliver(), state()) -> state(). distribute_event(JObj, Deliver, State) -> case callback_handle_event(JObj, Deliver, State) of 'ignore' -> State; {'ignore', ModuleState} -> State#state{module_state=ModuleState}; {CallbackData, ModuleState} -> distribute_event(CallbackData, JObj, Deliver, State#state{module_state=ModuleState}); CallbackData -> distribute_event(CallbackData, JObj, Deliver, State) end. -spec distribute_event(callback_data(), kz_json:object(), deliver(), state()) -> state(). distribute_event(CallbackData, JObj, Deliver, #state{responders=Responders ,consumer_key=ConsumerKey }=State) -> Key = kz_util:get_event_type(JObj), Channel = kz_amqp_channel:consumer_channel(), _ = [kz_util:spawn(fun client_handle_event/6, [JObj ,Channel ,ConsumerKey ,Callback ,CallbackData ,Deliver ]) || {Evt, Callback} <- Responders, maybe_event_matches_key(Key, Evt) ], State. -spec client_handle_event(kz_json:object(), kz_amqp_channel:consumer_channel(), kz_amqp_channel:consumer_pid(), responder_mfa(), callback_data(), deliver()) -> any(). client_handle_event(JObj, 'undefined', ConsumerKey, Callback, CallbackData, Deliver) -> _ = kz_util:put_callid(JObj), _ = kz_amqp_channel:consumer_pid(ConsumerKey), client_handle_event(JObj, Callback, CallbackData, Deliver); client_handle_event(JObj, Channel, ConsumerKey, Callback, CallbackData, Deliver) -> _ = kz_util:put_callid(JObj), _ = kz_amqp_channel:consumer_pid(ConsumerKey), _ = is_process_alive(Channel) andalso kz_amqp_channel:consumer_channel(Channel), client_handle_event(JObj, Callback, CallbackData, Deliver). -spec client_handle_event(kz_json:object(), responder_mfa(), callback_data(), deliver()) -> any(). client_handle_event(JObj, {Fun, 4}, CallbackData, {BasicDeliver, Basic}) -> Fun(JObj, CallbackData, BasicDeliver, Basic); client_handle_event(JObj, {Fun, 3}, CallbackData, {BasicDeliver, _Basic}) -> Fun(JObj, CallbackData, BasicDeliver); client_handle_event(JObj, {Fun, 2}, CallbackData, _Deliver) -> Fun(JObj, CallbackData); client_handle_event(JObj, {Module, Fun, 4}, CallbackData, {BasicDeliver, Basic}) -> Module:Fun(JObj, CallbackData, BasicDeliver, Basic); client_handle_event(JObj, {Module, Fun, 3}, CallbackData, {BasicDeliver, _Basic}) -> Module:Fun(JObj, CallbackData, BasicDeliver); client_handle_event(JObj, {Module, Fun, 2}, CallbackData, _Deliver) -> Module:Fun(JObj, CallbackData). -spec callback_handle_event(kz_json:object(), deliver(), state()) -> 'ignore' | {'ignore', module_state()} | callback_data() | {callback_data(), module_state()}. callback_handle_event(_JObj ,{BasicDeliver, Basic} ,#state{module_state=ModuleState ,queue=Queue ,other_queues=OtherQueues ,self=Self ,handle_event_mfa='undefined' } ) -> [{'server', Self} ,{'queue', Queue} ,{'basic', Basic} ,{'deliver', BasicDeliver} ,{'other_queues', props:get_keys(OtherQueues)} ,{'state', ModuleState} ]; callback_handle_event(JObj ,{BasicDeliver, Basic}=Deliver ,#state{module_state=ModuleState ,queue=Queue ,other_queues=OtherQueues ,self=Self ,handle_event_mfa=MFA } ) -> case callback_handle_event(JObj, Deliver, MFA, ModuleState) of 'ignore' -> 'ignore'; {'ignore', _NewModuleState} = Reply -> Reply; {'reply', Props} when is_list(Props) -> [{'server', Self} ,{'queue', Queue} ,{'basic', Basic} ,{'deliver', BasicDeliver} ,{'other_queues', props:get_keys(OtherQueues)} | Props ]; {'reply', Props, NewModuleState} when is_list(Props) -> {[{'server', Self} ,{'queue', Queue} ,{'basic', Basic} ,{'deliver', BasicDeliver} ,{'other_queues', props:get_keys(OtherQueues)} | Props ] ,NewModuleState }; {'EXIT', Why} -> lager:error("CRASH in handle_event ~p", [Why]), [{'server', Self} ,{'queue', Queue} ,{'basic', Basic} ,{'deliver', BasicDeliver} ,{'other_queues', props:get_keys(OtherQueues)} ,{'state', ModuleState} ] end. -spec callback_handle_event(kz_json:object(), deliver(), mfa(), module_state()) -> handle_event_return() | {'EXIT', any()}. callback_handle_event(JObj, _, {Module, Fun, 2}, ModuleState) -> catch Module:Fun(JObj, ModuleState); callback_handle_event(JObj, {BasicDeliver, _}, {Module, Fun, 3}, ModuleState) -> catch Module:Fun(JObj, BasicDeliver, ModuleState); callback_handle_event(JObj, {BasicDeliver, Basic}, {Module, Fun, 4}, ModuleState) -> catch Module:Fun(JObj, BasicDeliver, Basic, ModuleState); callback_handle_event(_, _, _, _) -> {'EXIT', 'not_exported'}. %% allow wildcard (<<"*">>) in the Key to match either (or both) Category and Name -spec maybe_event_matches_key(responder_callback_mapping(), responder_callback_mapping()) -> boolean(). maybe_event_matches_key(Evt, Evt) -> 'true'; maybe_event_matches_key({_,_}, {<<"*">>, <<"*">>}) -> 'true'; maybe_event_matches_key({_, Name}, {<<"*">>, Name}) -> 'true'; maybe_event_matches_key({Cat, _}, {Cat, <<"*">>}) -> 'true'; maybe_event_matches_key(_A, _B) -> 'false'. -spec start_amqp(kz_term:proplist(), boolean()) -> {'ok', binary()} | {'error', _}. start_amqp(Props, AutoAck) -> QueueProps = props:get_value('queue_options', Props, []), QueueName = props:get_value('queue_name', Props, <<>>), ConsumeOptions = props:get_value('consume_options', Props, []), case kz_amqp_util:new_queue(QueueName, QueueProps) of {'error', _}=E -> E; Q -> set_qos(props:get_value('basic_qos', Props)), 'ok' = start_consumer(Q, maybe_configure_auto_ack(ConsumeOptions, AutoAck)), lager:debug("queue started: ~s", [Q]), {'ok', Q} end. -spec set_qos('undefined' | non_neg_integer()) -> 'ok'. set_qos('undefined') -> 'ok'; set_qos(N) when is_integer(N), N >= 0 -> kz_amqp_util:basic_qos(N). -spec start_consumer(kz_term:ne_binary(), kz_term:proplist()) -> 'ok'. start_consumer(Q, 'undefined') -> kz_amqp_util:basic_consume(Q, []); start_consumer(Q, ConsumeProps) -> kz_amqp_util:basic_consume(Q, ConsumeProps). -spec remove_binding(binding_module(), kz_term:proplist(), kz_term:api_binary()) -> 'ok'. remove_binding(Binding, Props, Q) -> Wapi = list_to_binary([<<"kapi_">>, kz_term:to_binary(Binding)]), lager:debug("trying to remove bindings with ~s:unbind_q(~s, ~p)", [Wapi, Q, Props]), try (kz_term:to_atom(Wapi, 'true')):unbind_q(Q, Props) catch 'error':'undef' -> erlang:error({'api_module_undefined', Wapi}) end. -spec create_binding(kz_term:ne_binary(), kz_term:proplist(), kz_term:ne_binary()) -> any(). create_binding(Binding, Props, Q) when not is_binary(Binding) -> create_binding(kz_term:to_binary(Binding), Props, Q); create_binding(Binding, Props, Q) -> Wapi = list_to_binary([<<"kapi_">>, kz_term:to_binary(Binding)]), try (kz_term:to_atom(Wapi, 'true')):bind_q(Q, Props) catch 'error':'undef' -> erlang:error({'api_module_undefined', Wapi}) end. -spec stop_timer(kz_term:api_reference()) -> non_neg_integer() | 'false'. stop_timer('undefined') -> 'false'; stop_timer(Ref) when is_reference(Ref) -> erlang:cancel_timer(Ref). -spec start_timer(timeout()) -> kz_term:api_reference(). start_timer(0) -> self() ! ?CALLBACK_TIMEOUT_MSG, 'undefined'; start_timer(Timeout) when is_integer(Timeout) andalso Timeout >= 0 -> erlang:send_after(Timeout, self(), ?CALLBACK_TIMEOUT_MSG); start_timer(_) -> 'undefined'. -spec add_other_queue(binary(), kz_term:proplist(), kz_term:proplist(), state()) -> {kz_term:ne_binary(), state()}. add_other_queue(<<>>, QueueProps, Bindings, #state{other_queues=OtherQueues, auto_ack=AutoAck}=State) -> {'ok', Q} = start_amqp(QueueProps, AutoAck), gen_server:cast(self(), {?MODULE, {'created_queue', Q}}), _ = [create_binding(Type, BindProps, Q) || {Type, BindProps} <- Bindings], {Q, State#state{other_queues=[{Q, {Bindings, QueueProps}}|OtherQueues]}}; add_other_queue(QueueName, QueueProps, Bindings, #state{other_queues=OtherQueues, auto_ack=AutoAck}=State) -> {'ok', Q} = start_amqp([{'queue_name', QueueName} | QueueProps], AutoAck), gen_server:cast(self(), {?MODULE, {'created_queue', Q}}), _ = [create_binding(Type, BindProps, Q) || {Type, BindProps} <- Bindings], case props:get_value(QueueName, OtherQueues) of 'undefined' -> {Q, State#state{other_queues=[{Q, {Bindings, QueueProps}}|OtherQueues]}}; OldBindings -> {Q, State#state{other_queues=[{Q, {Bindings ++ OldBindings, QueueProps}} | props:delete(QueueName, OtherQueues) ]}} end. -spec handle_module_call(any(), kz_term:pid_ref(), state()) -> handle_call_return(). handle_module_call(Request, From, #state{module=Module ,module_state=ModuleState ,module_timeout_ref=OldRef }=State) -> _ = stop_timer(OldRef), try Module:handle_call(Request, From, ModuleState) of {'reply', Reply, ModuleState1} -> {'reply', Reply ,State#state{module_state=ModuleState1 ,module_timeout_ref='undefined' } }; {'reply', Reply, ModuleState1, Timeout} -> {'reply', Reply ,State#state{module_state=ModuleState1 ,module_timeout_ref=start_timer(Timeout) } }; {'noreply', ModuleState1} -> {'noreply', State#state{module_state=ModuleState1}}; {'noreply', ModuleState1, Timeout} -> {'noreply' ,State#state{module_state=ModuleState1 ,module_timeout_ref=start_timer(Timeout) } }; {'stop', Reason, ModuleState1} -> {'stop', Reason, State#state{module_state=ModuleState1}}; {'stop', Reason, Reply, ModuleState1} -> {'stop', Reason, Reply, State#state{module_state=ModuleState1}} catch _E:R -> ST = erlang:get_stacktrace(), lager:debug("handle_call exception: ~s: ~p", [_E, R]), kz_util:log_stacktrace(ST), {'stop', R, State} end. -spec handle_module_cast(any(), state()) -> handle_cast_return(). handle_module_cast(Msg, #state{module=Module ,module_state=ModuleState ,module_timeout_ref=OldRef }=State) -> _ = stop_timer(OldRef), try Module:handle_cast(Msg, ModuleState) of {'noreply', ModuleState1} -> {'noreply', State#state{module_state=ModuleState1}}; {'noreply', ModuleState1, Timeout} -> Ref = start_timer(Timeout), {'noreply', State#state{module_state=ModuleState1 ,module_timeout_ref=Ref } }; {'stop', Reason, ModuleState1} -> {'stop', Reason, State#state{module_state=ModuleState1}} catch _E:R -> ST = erlang:get_stacktrace(), lager:debug("handle_cast exception: ~s: ~p", [_E, R]), kz_util:log_stacktrace(ST), {'stop', R, State} end. -spec handle_rm_binding(binding(), kz_term:proplist(), state()) -> state(). handle_rm_binding(Binding, Props, #state{queue=Q ,bindings=Bs }=State) -> KeepBs = [BP || BP <- Bs, maybe_remove_binding(BP ,kz_term:to_binary(Binding) ,Props ,Q )], maybe_remove_federated_binding(Binding, Props, State), State#state{bindings=KeepBs}. -spec handle_add_binding(binding_module(), kz_term:proplist(), state()) -> state(). handle_add_binding(Binding, Props, #state{queue=Q ,bindings=Bs }=State) -> case lists:keyfind(Binding, 1, Bs) of 'false' -> lager:debug("creating new binding: '~s'", [Binding]), create_binding(Binding, Props, Q), maybe_update_federated_bindings(State#state{bindings=[{Binding, Props}|Bs]}); {Binding, ExistingProps} -> handle_existing_binding(Binding, Props, State, Q, ExistingProps, Bs) end. -spec handle_existing_binding(binding_module(), kz_term:proplist(), state(), kz_term:ne_binary(), kz_term:proplist(), bindings()) -> state(). handle_existing_binding(Binding, Props, State, Q, ExistingProps, Bs) -> case lists:all(fun({K,V}) -> props:get_value(K, ExistingProps) =:= V; (K) -> props:get_value(K, ExistingProps) =:= 'true' end ,Props ) of 'true' when length(Props) =:= length(ExistingProps)-> lager:debug("binding ~s with props exists", [Binding]), State; _ -> lager:debug("creating existing binding '~s' with new props: ~p", [Binding, Props]), create_binding(Binding, Props, Q), maybe_update_federated_bindings(State#state{bindings=[{Binding, Props}|Bs]}) end. -spec maybe_update_federated_bindings(state()) -> state(). maybe_update_federated_bindings(#state{bindings=[{_Binding, Props}|_]}=State) -> case is_federated_binding(Props) of 'false' -> State; 'true' -> update_federated_bindings(State) end. -spec is_federated_binding(kz_term:proplist()) -> boolean(). is_federated_binding(Props) -> props:get_value('federate', Props) =:= 'true'. -spec update_federated_bindings(state()) -> state(). update_federated_bindings(#state{bindings=[{Binding, Props}|_] ,federators=Fs }=State) -> case kz_amqp_connections:federated_brokers() of [] -> lager:debug("no federated brokers to connect to, skipping federating binding '~s'", [Binding]), State; FederatedBrokers -> NonFederatedProps = props:delete('federate', Props), {_Existing, New} = broker_connections(Fs, FederatedBrokers), 'ok' = update_existing_listeners_bindings(Fs, Binding, NonFederatedProps), {'ok', NewListeners} = start_new_listeners(New, Binding, NonFederatedProps, State), State#state{federators=NewListeners ++ Fs, waiting_federators=New ++ State#state.waiting_federators} end. -spec maybe_remove_federated_binding(binding(), kz_term:proplist(), state()) -> 'ok'. maybe_remove_federated_binding(Binding, Props, State) -> maybe_remove_federated_binding(is_federated_binding(Props), Binding, Props, State). -spec maybe_remove_federated_binding(boolean(), binding(), kz_term:proplist(), state()) -> 'ok'. maybe_remove_federated_binding('true', Binding, Props, #state{federators=Fs}) when Fs =/= [] -> NonFederatedProps = props:delete('federate', Props), remove_federated_binding(Fs, Binding, NonFederatedProps); maybe_remove_federated_binding(_Flag, _Binging, _Props, _State) -> 'ok'. -spec broker_connections(federator_listeners(), kz_term:ne_binaries()) -> {kz_term:ne_binaries(), kz_term:ne_binaries()}. broker_connections(Listeners, Brokers) -> lists:partition(fun(Broker) -> props:get_value(Broker, Listeners) =/= 'undefined' end, Brokers). -spec start_new_listeners(kz_term:ne_binaries(), binding_module(), kz_term:proplist(), state()) -> {'ok', federator_listeners()}. start_new_listeners(Brokers, Binding, Props, State) -> {'ok', [start_new_listener(Broker, Binding, Props, State) || Broker <- Brokers ]}. -spec start_new_listener(kz_term:ne_binary(), binding_module(), kz_term:proplist(), state()) -> federator_listener(). start_new_listener(Broker, Binding, Props, #state{params=Ps}) -> FederateParams = create_federated_params({Binding, Props}, Ps), {'ok', Pid} = listener_federator:start_link(self(), Broker, FederateParams), lager:debug("started federated listener on broker ~s: ~p", [Broker, Pid]), {Broker, Pid}. -spec remove_federated_binding(federator_listeners(), binding_module(), kz_term:proplist()) -> 'ok'. remove_federated_binding(Listeners, Binding, Props) -> _ = [?MODULE:rm_binding(Pid, Binding, Props) || {_Broker, Pid} <- Listeners ], 'ok'. -spec update_existing_listeners_bindings(federator_listeners(), binding_module(), kz_term:proplist()) -> 'ok'. update_existing_listeners_bindings(Listeners, Binding, Props) -> _ = [update_existing_listener_bindings(Listener, Binding, Props) || Listener <- Listeners ], 'ok'. -spec update_existing_listener_bindings(federator_listener(), binding_module(), kz_term:proplist()) -> 'ok'. update_existing_listener_bindings({_Broker, Pid}, Binding, Props) -> lager:debug("updating listener ~p with ~s", [Pid, Binding]), ?MODULE:add_binding(Pid, Binding, Props). -spec create_federated_params({binding_module(), kz_term:proplist()}, kz_term:proplist()) -> kz_term:proplist(). create_federated_params(FederateBindings, Params) -> QueueOptions = props:get_value('queue_options', Params, []), [{'responders', []} ,{'bindings', [FederateBindings]} ,{'queue_name', federated_queue_name(Params, QueueOptions)} ,{'queue_options', QueueOptions} ,{'consume_options', props:get_value('consume_options', Params, [])} ]. -spec federated_queue_name(kz_term:proplist(), kz_term:proplist()) -> kz_term:api_binary(). federated_queue_name(Params, Options) -> QueueName = props:get_value('queue_name', Params, <<>>), IsGlobalQueue = props:is_true('federated_queue_name_is_global', Options, 'false'), case kz_term:is_empty(QueueName) of 'true' -> QueueName; 'false' when IsGlobalQueue -> QueueName; 'false' -> Zone = kz_config:zone('binary'), <> end. -spec handle_amqp_channel_available(state()) -> state(). handle_amqp_channel_available(#state{params=Params}=State) -> lager:debug("channel started, let's connect"), case maybe_declare_exchanges(props:get_value('declare_exchanges', Params, [])) of 'ok' -> handle_exchanges_ready(State); {'error', _E} -> lager:debug("error declaring exchanges : ~p", [_E]), handle_exchanges_failed(State) end. -spec handle_exchanges_ready(state()) -> state(). handle_exchanges_ready(#state{params=Params, auto_ack=AutoAck}=State) -> case start_amqp(Params, AutoAck) of {'ok', Q} -> handle_amqp_started(State, Q); {'error', Reason} -> lager:error("start amqp error ~p", [Reason]), handle_amqp_errored(State) end. -spec handle_amqp_started(state(), kz_term:ne_binary()) -> state(). handle_amqp_started(#state{params=Params}=State, Q) -> State1 = start_initial_bindings(State#state{queue=Q}, Params), gen_server:cast(self(), {?MODULE, {'created_queue', Q}}), maybe_server_confirms(props:get_value('server_confirms', Params, 'false')), maybe_channel_flow(props:get_value('channel_flow', Params, 'false')), erlang:send_after(?TIMEOUT_RETRY_CONN, self(), '$is_gen_listener_consuming'), State1#state{is_consuming='false'}. -spec handle_amqp_errored(state()) -> state(). handle_amqp_errored(#state{params=Params}=State) -> #kz_amqp_assignment{channel=Channel} = kz_amqp_assignments:get_channel(), _ = (catch kz_amqp_channel:release()), kz_amqp_channel:close(Channel), timer:sleep(?SERVER_RETRY_PERIOD), _ = channel_requisition(Params), State#state{is_consuming='false'}. -spec handle_exchanges_failed(state()) -> state(). handle_exchanges_failed(#state{params=Params}=State) -> #kz_amqp_assignment{channel=Channel} = kz_amqp_assignments:get_channel(), _ = (catch kz_amqp_channel:release()), kz_amqp_channel:close(Channel), timer:sleep(?SERVER_RETRY_PERIOD), _ = channel_requisition(Params), State#state{is_consuming='false'}. -spec maybe_server_confirms(boolean()) -> 'ok'. maybe_server_confirms('true') -> kz_amqp_util:confirm_select(); maybe_server_confirms(_) -> 'ok'. -spec maybe_channel_flow(boolean()) -> 'ok'. maybe_channel_flow('true') -> kz_amqp_util:flow_control(); maybe_channel_flow(_) -> 'ok'. -spec maybe_declare_exchanges(declare_exchanges()) -> command_ret(). maybe_declare_exchanges([]) -> 'ok'; maybe_declare_exchanges(Exchanges) -> maybe_declare_exchanges(kz_amqp_assignments:get_channel(), Exchanges). -spec maybe_declare_exchanges(kz_amqp_assignment(), declare_exchanges()) -> command_ret(). maybe_declare_exchanges(_Channel, []) -> 'ok'; maybe_declare_exchanges(Channel, [{Ex, Type, Opts} | Exchanges]) -> declare_exchange(Channel, kz_amqp_util:declare_exchange(Ex, Type, Opts), Exchanges); maybe_declare_exchanges(Channel, [{Ex, Type} | Exchanges]) -> declare_exchange(Channel, kz_amqp_util:declare_exchange(Ex, Type), Exchanges). -spec declare_exchange(kz_amqp_assignment(), kz_amqp_exchange(), declare_exchanges()) -> command_ret(). declare_exchange(Channel, Exchange, Exchanges) -> case kz_amqp_channel:command(Channel, Exchange) of {'ok', _} -> maybe_declare_exchanges(Channel, Exchanges); E -> E end. -spec start_initial_bindings(state(), kz_term:proplist()) -> state(). start_initial_bindings(State, Params) -> lists:foldl(fun({Binding, Props}, StateAcc) -> handle_add_binding(kz_term:to_binary(Binding), Props, StateAcc) end ,State ,props:get_value('bindings', Params, []) ). -spec channel_requisition(kz_term:proplist()) -> boolean(). channel_requisition([]) -> 'false'; channel_requisition(Params) -> case props:get_value('broker_tag', Params) of 'undefined' -> case props:get_value('broker', Params) of 'undefined' -> kz_amqp_channel:requisition(); Broker -> maybe_add_broker_connection(Broker) end; Tag -> case kz_amqp_connections:broker_with_tag(Tag) of 'undefined' -> kz_amqp_channel:requisition(); Broker -> maybe_add_broker_connection(Broker) end end. -spec maybe_add_broker_connection(binary()) -> boolean(). maybe_add_broker_connection(Broker) -> Count = kz_amqp_connections:broker_available_connections(Broker), maybe_add_broker_connection(Broker, Count). -spec maybe_add_broker_connection(binary(), non_neg_integer()) -> boolean(). maybe_add_broker_connection(Broker, Count) when Count =:= 0 -> _Connection = kz_amqp_connections:add(Broker, kz_binary:rand_hex(6), [<<"hidden">>]), kz_amqp_channel:requisition(self(), Broker); maybe_add_broker_connection(Broker, _Count) -> kz_amqp_channel:requisition(self(), Broker). -spec start_listener(pid(), kz_term:proplist()) -> 'ok'. start_listener(Srv, Params) -> gen_server:cast(Srv, {'start_listener', Params}). maybe_configure_auto_ack(Props, 'false') -> Props; maybe_configure_auto_ack(Props, 'true') -> [{'no_ack', 'false'} | props:delete('no_ack', Props)].