%%%-----------------------------------------------------------------------------
%%% @copyright (C) 2012-2019, 2600Hz
%%% @doc Worker with a dedicated targeted queue.
%%% Inserts Queue Name as the `Server-ID' and proxies the AMQP request
%%% (expects responses to the request).
%%%
%%% There are two primary interactions, `call' and `call_collect':
%%%
%%% - `call'
- The semantics of call are similar to `gen_server''s call: send a
%%% request, expect a response back (or timeout)
%%% - `call_collect'
- uses the timeout to collect responses (successful or not)
%%% and returns the resulting list of responses
%%%
%%%
%%% @author James Aimonetti
%%% @end
%%%-----------------------------------------------------------------------------
-module(kz_amqp_worker).
-behaviour(gen_listener).
%% API
-export([start_link/1
,call/2, call/3, call/4, call/5
,call_collect/2, call_collect/3, call_collect/4
,call_custom/4, call_custom/5
,cast/2, cast/3
,relay_to/2, stop_relay/2
,default_timeout/0
,collect_until_timeout/0
,collect_from_whapp/1
,collect_from_whapp_or_validate/2
,send_request/4
,checkout_worker/0, checkout_worker/1
,checkin_worker/1, checkin_worker/2
,worker_pool/0, worker_pool/1
]).
%% gen_listener callbacks
-export([init/1
,handle_call/3
,handle_cast/2
,handle_info/2
,handle_event/2
,terminate/2
,code_change/3
]).
-include("kz_amqp_util.hrl").
-define(SERVER, ?MODULE).
-define(FUDGE, 2600).
-define(BINDINGS, [{'self', []}]).
-define(QUEUE_NAME, <<>>).
-define(QUEUE_OPTIONS, []).
-define(CONSUME_OPTIONS, []).
-type publish_fun() :: fun((kz_term:api_terms()) -> any()).
-type validate_fun() :: fun((kz_term:api_terms()) -> boolean()).
-type collect_until_acc() :: any().
-type collect_until_acc_fun() :: fun((kz_json:objects(), collect_until_acc()) -> boolean() | {boolean(), collect_until_acc()}).
-type collect_until_fun() :: fun((kz_json:objects()) -> boolean()) |
collect_until_acc_fun() |
{collect_until_acc_fun(), collect_until_acc()}.
-type whapp() :: atom() | kz_term:ne_binary().
-type collect_until() :: collect_until_fun() |
whapp() |
{whapp(), validate_fun() | boolean()} | %% {Whapp, VFun | IncludeFederated}
{whapp(), validate_fun(), boolean()} | %% {Whapp, VFun, IncludeFederated}
{whapp(), boolean(), boolean()} | %% {Whapp, IncludeFederated, IsShared}
{whapp(), validate_fun(), boolean(), boolean()}. %% {Whapp, VFun, IncludeFederated, IsShared}
-type timeout_or_until() :: timeout() | collect_until().
%% case {IsFederated, IsShared} of
%% {'true', 'true'} -> Get from {0,1} whapp instance per zone
%% {'false', 'true'} -> %% Get from {0,1} whapp instance in the local zone
%% _Otherwise -> Get from all instances, either local or federated
-export_type([publish_fun/0
,validate_fun/0
,collect_until/0
,timeout_or_until/0
,request_return/0
,cast_return/0
]).
-record(state, {current_msg_id :: kz_term:api_binary()
,client_pid :: kz_term:api_pid()
,client_ref :: kz_term:api_reference()
,client_from :: kz_term:api_pid_ref() | 'relay'
,client_vfun :: validate_fun() | 'undefined'
,client_cfun = collect_until_timeout() :: collect_until_fun()
,responses :: kz_term:api_objects()
,neg_resp :: kz_term:api_object()
,neg_resp_count = 0 :: non_neg_integer()
,neg_resp_threshold = 1 :: pos_integer()
,req_timeout_ref :: kz_term:api_reference()
,req_start_time :: kz_time:now() | 'undefined'
,callid :: kz_term:api_binary()
,pool_ref :: kz_types:server_ref()
,defer_response :: kz_term:api_object()
,queue :: kz_term:api_binary()
,confirms = 'false' :: boolean()
,flow = 'undefined' :: kz_term:api_boolean()
,acc = 'undefined' :: any()
,defer = 'undefined' :: 'undefined' | {any(), {pid(), reference()}}
,confirm_timeout_ref :: kz_term:api_reference()
,confirm_start_time :: kz_time:now() | 'undefined'
,timeout :: non_neg_integer() | 'undefined'
,method :: atom()
,reply_to :: kz_term:api_pid_ref()
}).
-type state() :: #state{}.
%%%=============================================================================
%%% API
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc Starts the server.
%% @end
%%------------------------------------------------------------------------------
-spec start_link(kz_term:proplist()) -> kz_types:startlink_ret().
start_link(Args) ->
gen_listener:start_link(?SERVER, [{'bindings', maybe_bindings(Args)}
,{'queue_name', maybe_queuename(Args)}
,{'queue_options', ?QUEUE_OPTIONS}
,{'consume_options', ?CONSUME_OPTIONS}
| maybe_broker(Args)
++ maybe_exchanges(Args)
++ maybe_server_confirms(Args)
], [Args]).
-spec maybe_broker(kz_term:proplist()) -> kz_term:proplist().
maybe_broker(Args) ->
case props:get_value('amqp_broker', Args) of
'undefined' -> [];
Broker -> [{'broker', Broker}]
end.
-spec maybe_queuename(kz_term:proplist()) -> binary().
maybe_queuename(Args) ->
case props:get_value('amqp_queuename_start', Args) of
'undefined' -> ?QUEUE_NAME;
QueueStart -> <<(kz_term:to_binary(QueueStart))/binary, "_", (kz_binary:rand_hex(4))/binary>>
end.
-spec maybe_bindings(kz_term:proplist()) -> kz_term:proplist().
maybe_bindings(Args) ->
case props:get_value('amqp_bindings', Args) of
'undefined' -> ?BINDINGS;
Bindings -> Bindings
end.
-spec maybe_exchanges(kz_term:proplist()) -> kz_term:proplist().
maybe_exchanges(Args) ->
case props:get_value('amqp_exchanges', Args) of
'undefined' -> [];
Exchanges -> [{'declare_exchanges', Exchanges}]
end.
-spec maybe_server_confirms(kz_term:proplist()) -> kz_term:proplist().
maybe_server_confirms(Args) ->
case props:get_value('amqp_server_confirms', Args) of
'undefined' -> [];
Confirms -> [{'server_confirms', Confirms}]
end.
-spec default_timeout() -> 2000.
default_timeout() -> 2 * ?MILLISECONDS_IN_SECOND.
-type request_return() :: {'ok', kz_json:object() | kz_json:objects()} |
{'returned', kz_json:object(), kz_json:object()} |
{'timeout', kz_json:objects()} |
{'error', any()}.
-spec call(kz_term:api_terms(), publish_fun()) -> request_return().
call(Req, PubFun) ->
call(Req, PubFun, fun kz_term:always_true/1).
-spec call(kz_term:api_terms(), publish_fun(), validate_fun()) ->
request_return().
call(Req, PubFun, VFun) ->
call(Req, PubFun, VFun, default_timeout()).
-spec call(kz_term:api_terms(), publish_fun(), validate_fun(), timeout()) ->
request_return().
call(Req, PubFun, VFun, Timeout) ->
case next_worker() of
{'error', _}=E -> E;
Worker -> call(Req, PubFun, VFun, Timeout, Worker)
end.
-spec call(kz_term:api_terms(), publish_fun(), validate_fun(), timeout(), pid() | atom()) ->
request_return().
call(Req, PubFun, VFun, Timeout, Pool) when is_atom(Pool) ->
case next_worker(Pool) of
{'error', _}=E -> E;
Worker -> call(Req, PubFun, VFun, Timeout, Worker)
end;
call(Req, PubFun, VFun, Timeout, Worker) when is_pid(Worker) ->
Prop = maybe_convert_to_proplist(Req),
try gen_listener:call(Worker
,{'request', Prop, PubFun, VFun, Timeout}
,fudge_timeout(Timeout)
)
catch
'exit':{timeout, _} ->
lager:warning("request timeout"),
{error, timeout};
_E:R ->
lager:warning("request failed: ~s: ~p", [_E, R]),
{'error', R}
after
checkin_worker(Worker)
end.
-type pool_error() :: 'pool_full' | 'poolboy_fault'.
-spec next_worker() -> pid() | {'error', pool_error()}.
next_worker() ->
next_worker(worker_pool()).
-spec next_worker(atom()) -> pid() | {'error', pool_error()}.
next_worker(Pool) ->
try poolboy:checkout(Pool, 'false', default_timeout()) of
'full' -> {'error', 'pool_full'};
Worker -> Worker
catch
_E:_R ->
lager:warning("poolboy exception: ~s: ~p", [_E, _R]),
{'error', 'poolboy_fault'}
end.
-spec checkout_worker() -> {'ok', pid()} | {'error', pool_error()}.
checkout_worker() ->
checkout_worker(worker_pool()).
-spec checkout_worker(atom()) -> {'ok', pid()} | {'error', pool_error()}.
checkout_worker(Pool) ->
try poolboy:checkout(Pool, 'false', default_timeout()) of
'full' -> {'error', 'pool_full'};
Worker -> {'ok', Worker}
catch
_E:_R ->
lager:warning("poolboy exception: ~s: ~p", [_E, _R]),
{'error', 'poolboy_fault'}
end.
-spec checkin_worker(pid()) -> 'ok'.
checkin_worker(Worker) ->
checkin_worker(Worker, worker_pool()).
-spec checkin_worker(pid(), atom()) -> 'ok'.
checkin_worker(Worker, Pool) ->
poolboy:checkin(Pool, Worker).
-spec call_custom(kz_term:api_terms(), publish_fun(), validate_fun(), gen_listener:binding()) ->
request_return().
call_custom(Req, PubFun, VFun, Bind) ->
call_custom(Req, PubFun, VFun, default_timeout(), Bind).
-spec call_custom(kz_term:api_terms(), publish_fun(), validate_fun(), timeout(), gen_listener:binding()) ->
request_return().
call_custom(Req, PubFun, VFun, Timeout, Bind) ->
case next_worker() of
{'error', _}=E -> E;
Worker -> call_custom(Req, PubFun, VFun, Timeout, Bind, Worker)
end.
-spec call_custom(kz_term:api_terms(), publish_fun(), validate_fun(), timeout(), gen_listener:binding(), pid()) ->
request_return().
call_custom(Req, PubFun, VFun, Timeout, Bind, Worker) ->
Prop = maybe_convert_to_proplist(Req),
gen_listener:add_binding(Worker, Bind),
try gen_listener:call(Worker
,{'request', Prop, PubFun, VFun, Timeout}
,fudge_timeout(Timeout)
)
catch
_E:R ->
lager:debug("request failed: ~s: ~p", [_E, R]),
{'error', R}
after
gen_listener:rm_binding(Worker, Bind),
checkin_worker(Worker)
end.
-spec call_collect(kz_term:api_terms(), publish_fun()) ->
request_return().
call_collect(Req, PubFun) ->
call_collect(Req, PubFun, default_timeout()).
-spec call_collect(kz_term:api_terms(), publish_fun(), timeout_or_until()) ->
request_return().
call_collect(Req, PubFun, UntilFun) when is_function(UntilFun) ->
call_collect(Req, PubFun, UntilFun, default_timeout());
call_collect(Req, PubFun, Whapp) when is_atom(Whapp); is_binary(Whapp) ->
call_collect(Req, PubFun, Whapp, default_timeout());
call_collect(Req, PubFun, {_, _}=Until) ->
call_collect(Req, PubFun, Until, default_timeout());
call_collect(Req, PubFun, {_, _, _}=Until) ->
call_collect(Req, PubFun, Until, default_timeout());
call_collect(Req, PubFun, {_, _, _, _}=Until) ->
call_collect(Req, PubFun, Until, default_timeout());
call_collect(Req, PubFun, Timeout) ->
call_collect(Req, PubFun, collect_until_timeout(), Timeout).
-spec call_collect(kz_term:api_terms(), publish_fun(), collect_until(), timeout()) ->
request_return().
call_collect(_Req, _PubFun, 'undefined', _Timeout) ->
lager:debug("no VFun, no responses"),
{'ok', []};
call_collect(Req, PubFun, {Whapp, IncludeFederated}, Timeout)
when (is_atom(Whapp)
orelse is_binary(Whapp)
)
andalso is_boolean(IncludeFederated) ->
CollectFromWhapp = collect_from_whapp(Whapp, IncludeFederated),
call_collect(Req, PubFun, CollectFromWhapp, Timeout);
call_collect(Req, PubFun, {Whapp, VFun}, Timeout)
when (is_atom(Whapp)
orelse is_binary(Whapp)
)
andalso is_function(VFun) ->
CollectFromWhapp = collect_from_whapp_or_validate(Whapp, VFun),
call_collect(Req, PubFun, CollectFromWhapp, Timeout);
call_collect(Req, PubFun, {Whapp, IncludeFederated, IsShared}, Timeout)
when (is_atom(Whapp)
orelse is_binary(Whapp)
)
andalso is_boolean(IncludeFederated)
andalso is_boolean(IsShared) ->
CollectFromWhapp = collect_from_whapp(Whapp, IncludeFederated, IsShared),
call_collect(Req, PubFun, CollectFromWhapp, Timeout);
call_collect(Req, PubFun, {Whapp, VFun, IncludeFederated}, Timeout)
when (is_atom(Whapp)
orelse is_binary(Whapp)
)
andalso is_function(VFun)
andalso is_boolean(IncludeFederated) ->
CollectFromWhapp = collect_from_whapp_or_validate(Whapp, VFun, IncludeFederated),
call_collect(Req, PubFun, CollectFromWhapp, Timeout);
call_collect(Req, PubFun, {Whapp, VFun, IncludeFederated, IsShared}, Timeout)
when (is_atom(Whapp)
orelse is_binary(Whapp)
)
andalso is_function(VFun)
andalso is_boolean(IncludeFederated)
andalso is_boolean(IsShared) ->
CollectFromWhapp = collect_from_whapp_or_validate(Whapp, VFun, IncludeFederated, IsShared),
call_collect(Req, PubFun, CollectFromWhapp, Timeout);
call_collect(Req, PubFun, Whapp, Timeout)
when is_atom(Whapp)
orelse is_binary(Whapp) ->
call_collect(Req, PubFun, collect_from_whapp(Whapp), Timeout);
call_collect(Req, PubFun, UntilFun, Timeout)
when is_integer(Timeout)
andalso Timeout >= 0 ->
case next_worker() of
{'error', _}=E ->
lager:debug("failed to get next worker: ~p", [E]),
E;
Worker ->
call_collect(Req, PubFun, UntilFun, Timeout, Worker)
end.
-spec call_collect(kz_term:api_terms(), publish_fun(), collect_until(), timeout(), pid()) ->
request_return().
call_collect(Req, PubFun, {UntilFun, Acc}, Timeout, Worker)
when is_function(UntilFun, 2) ->
call_collect(Req, PubFun, UntilFun, Timeout, Acc, Worker);
call_collect(Req, PubFun, UntilFun, Timeout, Worker) ->
call_collect(Req, PubFun, UntilFun, Timeout, 'undefined', Worker).
call_collect(Req, PubFun, UntilFun, Timeout, Acc, Worker) ->
Prop = maybe_convert_to_proplist(Req),
try gen_listener:call(Worker
,{'call_collect', Prop, PubFun, UntilFun, Timeout, Acc}
,fudge_timeout(Timeout)
)
catch
_E:R ->
lager:debug("request failed: ~s: ~p", [_E, R]),
{'error', R}
after
checkin_worker(Worker)
end.
-type cast_return() :: 'ok' |
{'error', any()} |
{'returned', kz_json:object(), kz_json:object()}.
-spec cast(kz_term:api_terms(), publish_fun()) -> cast_return().
cast(Req, PubFun) ->
cast(Req, PubFun, worker_pool()).
-spec cast(kz_term:api_terms(), publish_fun(), pid() | atom()) -> cast_return().
cast(Req, PubFun, Pool) when is_atom(Pool) ->
case next_worker(Pool) of
{'error', _}=E -> E;
Worker ->
Resp = cast(Req, PubFun, Worker),
checkin_worker(Worker, Pool),
Resp
end;
cast(Req, PubFun, Worker) when is_pid(Worker) ->
Prop = maybe_convert_to_proplist(Req),
try gen_listener:call(Worker, {'publish', Prop, PubFun})
catch
_E:R ->
lager:debug("request failed: ~s: ~p", [_E, R]),
{'error', R}
end.
-spec relay_to(pid() | atom(), pid()) -> 'ok'.
relay_to(Worker, RelayPid) ->
gen_listener:call(Worker, {'relay_to', RelayPid}).
-spec stop_relay(pid() | atom(), pid()) -> 'ok'.
stop_relay(Worker, RelayPid) ->
gen_listener:call(Worker, {'stop_relay', RelayPid}).
-spec collect_until_timeout() -> collect_until_fun().
collect_until_timeout() -> fun kz_term:always_false/1.
-spec collect_from_whapp(kz_term:text()) -> 'undefined' | collect_until_fun().
collect_from_whapp(Whapp) ->
collect_from_whapp(Whapp, 'false').
-spec collect_from_whapp(kz_term:text(), boolean()) ->
'undefined' | collect_until_fun().
collect_from_whapp(Whapp, IncludeFederated) ->
collect_from_whapp(Whapp, IncludeFederated, 'false').
-spec collect_from_whapp(kz_term:text(), boolean(), boolean()) ->
'undefined' | collect_until_fun().
collect_from_whapp(Whapp, IncludeFederated, IsShared) ->
Count = case {IncludeFederated, IsShared} of
{'true', 'true'} -> kz_nodes:whapp_zone_count(Whapp); %% Get from {0,1} whapp instance per zone
{'false', 'true'} -> 1; %% Get from one whapp instance
_ -> kz_nodes:whapp_count(Whapp, IncludeFederated) %% Get from all instances, either local or federated
end,
lager:debug("attempting to collect ~p responses from ~s", [Count, Whapp]),
count_fun(Count).
-spec count_fun(non_neg_integer()) -> 'undefined' | collect_until_fun().
count_fun(0) -> 'undefined';
count_fun(Count) ->
fun(Responses) -> length(Responses) >= Count end.
-spec collect_from_whapp_or_validate(kz_term:text(), validate_fun()) -> collect_until_fun().
collect_from_whapp_or_validate(Whapp, VFun) ->
collect_from_whapp_or_validate(Whapp, VFun, 'false').
-spec collect_from_whapp_or_validate(kz_term:text(),validate_fun(), boolean()) -> collect_until_fun().
collect_from_whapp_or_validate(Whapp, VFun, IncludeFederated) ->
collect_from_whapp_or_validate(Whapp, VFun, IncludeFederated, 'false').
-spec collect_from_whapp_or_validate(kz_term:text(),validate_fun(), boolean(), boolean()) -> collect_until_fun().
collect_from_whapp_or_validate(Whapp, VFun, 'true', 'true') ->
Count = kz_nodes:whapp_zone_count(Whapp),
lager:debug("attempting to collect ~p responses from ~s or the first valid", [Count, Whapp]),
collect_or_validate_fun(VFun, Count);
collect_from_whapp_or_validate(Whapp, VFun, 'false', 'true') ->
Count = 1,
lager:debug("attempting to collect ~p responses from ~s or the first valid", [Count, Whapp]),
collect_or_validate_fun(VFun, Count);
collect_from_whapp_or_validate(Whapp, VFun, IncludeFederated, 'false') ->
Count = kz_nodes:whapp_count(Whapp, IncludeFederated),
lager:debug("attempting to collect ~p responses from ~s or the first valid", [Count, Whapp]),
collect_or_validate_fun(VFun, Count).
-spec collect_or_validate_fun(validate_fun(), pos_integer()) -> collect_until_fun().
collect_or_validate_fun(VFun, 0) ->
fun([Response|_]) -> VFun(Response) end;
collect_or_validate_fun(VFun, Count) ->
fun([Response|_]=Responses) ->
length(Responses) >= Count
orelse VFun(Response)
end.
-spec send_request(kz_term:ne_binary(), kz_term:ne_binary(), publish_fun(), kz_term:proplist()) ->
'ok' | {'error', any()}.
send_request(CallId, Self, PublishFun, ReqProps)
when is_function(PublishFun, 1) ->
kz_util:put_callid(CallId),
FilteredProps = request_filter(ReqProps),
Props = request_filter(props:set_values([{?KEY_SERVER_ID, Self}
,{?KEY_QUEUE_ID, props:get_value(?KEY_SERVER_ID, FilteredProps)}
,{?KEY_LOG_ID, CallId}
]
,FilteredProps
)),
try PublishFun(Props) of
'ok' -> 'ok'
catch
_R:E ->
lager:debug("failed to publish: ~s: ~p", [_R, E]),
kz_util:log_stacktrace(),
{'error', E}
end.
-spec request_filter(kz_term:proplist()) -> kz_term:proplist().
request_filter(Props) ->
props:filter(fun request_proplist_filter/1, Props).
-spec request_proplist_filter({kz_term:proplist_key(), kz_term:proplist_value()}) -> boolean().
request_proplist_filter({<<"Server-ID">>, Value}) ->
not kz_term:is_empty(Value);
request_proplist_filter({_, 'undefined'}) -> 'false';
request_proplist_filter(_) -> 'true'.
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc Initializes the server.
%% @end
%%------------------------------------------------------------------------------
-spec init(list()) -> {'ok', state()}.
init([Args]) ->
kz_util:put_callid(?DEFAULT_LOG_SYSTEM_ID),
lager:debug("starting amqp worker"),
NegThreshold = props:get_value('neg_resp_threshold', Args, 1),
Pool = props:get_value('name', Args, 'undefined'),
{'ok', #state{neg_resp_threshold=NegThreshold
,pool_ref=Pool
}}.
%%------------------------------------------------------------------------------
%% @doc Handling call messages.
%% @end
%%------------------------------------------------------------------------------
-spec handle_call(any(), kz_term:pid_ref(), state()) -> kz_types:handle_call_ret_state(state()).
handle_call(Call, From, #state{queue='undefined'}=State)
when is_tuple(Call) ->
kz_util:put_callid(element(2, Call)),
lager:debug("unable to publish message prior to queue creation - deferring"),
{'noreply', State#state{defer={Call,From}}};
handle_call(_, _, #state{flow='false'}=State) ->
lager:debug("flow control is active and server put us in waiting"),
{'reply', {'error', 'flow_control'}, reset(State)};
handle_call({'relay_to', RelayPid}, _From, State) ->
{'reply', 'ok', State#state{client_pid=RelayPid
,client_ref=erlang:monitor('process', RelayPid)
,client_from='relay'
}
};
handle_call({'stop_relay', RelayPid}, _From, #state{client_pid=RelayPid
,client_ref=Ref
,client_from='relay'
}=State) ->
erlang:demonitor(Ref, ['flush']),
lager:debug("stopping relay to ~p", [RelayPid]),
{'reply', 'ok', reset(State)};
handle_call({'request', ReqProp, PublishFun, VFun, Timeout}
,{ClientPid, _}=From
,#state{queue=Q, confirms=Confirm}=State
) ->
_ = kz_util:put_callid(ReqProp),
CallId = get('callid'),
MsgId = kz_api:msg_reply_id(ReqProp),
case send_request(CallId, Q, PublishFun, ReqProp) of
'ok' when Confirm =:= 'true' ->
lager:debug("published request with msg id ~s for ~p waiting for confirmation", [MsgId, ClientPid]),
{'noreply'
,State#state{client_pid = ClientPid
,client_ref = erlang:monitor('process', ClientPid)
,client_from = From
,client_vfun = VFun
,responses = 'undefined' % how we know not to collect any responses
,neg_resp_count = 0
,current_msg_id = MsgId
,confirm_timeout_ref = start_confirm_timeout(Timeout)
,confirm_start_time = os:timestamp()
,timeout = Timeout
,method = 'request'
,callid = CallId
}
};
'ok' ->
lager:debug("published request with msg id ~s for ~p", [MsgId, ClientPid]),
{'noreply'
,State#state{client_pid = ClientPid
,client_ref = erlang:monitor('process', ClientPid)
,client_from = From
,client_vfun = VFun
,responses = 'undefined' % how we know not to collect any responses
,neg_resp_count = 0
,current_msg_id = MsgId
,req_timeout_ref = start_req_timeout(Timeout)
,req_start_time = os:timestamp()
,callid = CallId
}
};
{'error', Err}=Error ->
lager:debug("failed to send request: ~p", [Err]),
{'reply', Error, reset(State)}
end;
handle_call({'call_collect', ReqProp, PublishFun, UntilFun, Timeout, Acc}
,{ClientPid, _}=From
,#state{queue=Q, confirms=Confirm}=State
) ->
_ = kz_util:put_callid(ReqProp),
CallId = get('callid'),
MsgId = kz_api:msg_reply_id(ReqProp),
case send_request(CallId, Q, PublishFun, ReqProp) of
'ok' when Confirm =:= 'true' ->
lager:debug("published request with msg id ~s for ~p waiting for confirmation", [MsgId, ClientPid]),
{'noreply'
,State#state{client_pid = ClientPid
,client_ref = erlang:monitor('process', ClientPid)
,client_from = From
,client_cfun = UntilFun
,acc = Acc
,responses = [] % how we know to collect all responses
,neg_resp_count = 0
,current_msg_id = MsgId
,confirm_timeout_ref = start_confirm_timeout(Timeout)
,confirm_start_time = os:timestamp()
,timeout = Timeout
,method = 'call_collect'
,callid = CallId
}
};
'ok' ->
lager:debug("published request with msg id ~s for ~p", [MsgId, ClientPid]),
{'noreply'
,State#state{client_pid = ClientPid
,client_ref = erlang:monitor('process', ClientPid)
,client_from = From
,client_cfun = UntilFun
,acc = Acc
,responses = [] % how we know to collect all responses
,neg_resp_count = 0
,current_msg_id = MsgId
,req_timeout_ref = start_req_timeout(Timeout)
,req_start_time = os:timestamp()
,callid = CallId
}
};
{'error', Err}=Error ->
lager:debug("failed to send request: ~p", [Err]),
{'reply', Error, reset(State)}
end;
handle_call({'publish', ReqProp0, PublishFun}
,{_Pid, _}=From
,#state{client_from='relay'
,queue=Queue
,confirms=Confirm
}=State
) ->
ReqProp = props:insert_value(?KEY_SERVER_ID, Queue, ReqProp0),
case publish_api(PublishFun, ReqProp) of
'ok' when Confirm =:= 'true' ->
{'noreply', State#state{confirm_timeout_ref = start_confirm_timeout(default_timeout())
,confirm_start_time = os:timestamp()
,method = 'publish'
,reply_to=From
}};
'ok' ->
lager:debug("published message ~s for ~p", [kz_api:msg_id(ReqProp), _Pid]),
{'reply', 'ok', State};
{'error', _E}=Err ->
lager:error("failed to publish message ~s for ~p: ~p", [kz_api:msg_id(ReqProp), _Pid, _E]),
{'reply', Err, State}
end;
handle_call({'publish', ReqProp, PublishFun}
,{Pid, _}=From
,#state{confirms=Confirm}=State
) ->
_ = kz_util:put_callid(ReqProp),
case publish_api(PublishFun, ReqProp) of
'ok' when Confirm =:= 'true' ->
lager:debug("published message ~s for ~p", [kz_api:msg_id(ReqProp), Pid]),
{'noreply'
,State#state{client_pid = Pid
,client_ref = erlang:monitor('process', Pid)
,client_from = From
,confirm_timeout_ref = start_confirm_timeout(default_timeout())
,confirm_start_time = os:timestamp()
,method = 'publish'
,req_start_time = os:timestamp()
}
};
'ok' ->
lager:debug("published message ~s for ~p", [kz_api:msg_id(ReqProp), Pid]),
{'reply', 'ok', reset(State)};
{'error', _E}=Err ->
lager:error("failed to publish message ~s for ~p: ~p", [kz_api:msg_id(ReqProp), Pid, _E]),
{'reply', Err, reset(State)}
end;
handle_call(_Request, _From, State) ->
{'reply', {'error', 'not_implemented'}, State}.
%%------------------------------------------------------------------------------
%% @doc Handling cast messages.
%% @end
%%------------------------------------------------------------------------------
-spec handle_cast(any(), state()) -> kz_types:handle_cast_ret_state(state()).
handle_cast('hibernate', State) ->
{'noreply', State, 'hibernate'};
handle_cast({'gen_listener', {'created_queue', Q}}, #state{defer='undefined'}=State) ->
{'noreply', State#state{queue=Q}};
handle_cast({'gen_listener', {'created_queue', Q}}, #state{defer={Call,From}}=State) ->
kz_util:put_callid(element(2, Call)),
lager:debug("resuming deferred call"),
case handle_call(Call, From, State#state{queue=Q}) of
{'reply', Reply, NewState} ->
gen_server:reply(From, Reply),
{'noreply', NewState};
{'noreply', _NewState}=NoReply ->
NoReply
end;
handle_cast({'set_negative_threshold', NegThreshold}, State) ->
lager:debug("set negative threshold to ~p", [NegThreshold]),
{'noreply', State#state{neg_resp_threshold = NegThreshold}, 'hibernate'};
handle_cast({'gen_listener', {'return', JObj, BasicReturn}}
,#state{current_msg_id = MsgId
,client_from = From
,confirms=Confirms
}=State) ->
_ = kz_util:put_callid(JObj),
case kz_api:msg_id(JObj) of
MsgId ->
lager:debug("published message was returned from the broker"),
gen_server:reply(From, {'returned', JObj, BasicReturn}),
{'noreply', reset(State), 'hibernate'};
_MsgId when Confirms =:= 'true' ->
lager:debug("published message was returned from the broker"),
gen_server:reply(From, {'returned', JObj, BasicReturn}),
{'noreply', reset(State), 'hibernate'};
_MsgId ->
lager:debug("ignoring published message was returned from the broker"),
lager:debug("payload: ~p", [JObj]),
lager:debug("return: ~p", [BasicReturn]),
{'noreply', State, 'hibernate'}
end;
handle_cast({'gen_listener', {'confirm', Msg}}, State) ->
handle_confirm(Msg, State);
handle_cast({'gen_listener',{'is_consuming',_IsConsuming}}, State) ->
{'noreply', State};
handle_cast({'gen_listener',{'server_confirms',ServerConfirms}}, State) ->
lager:debug("server confirms status : ~p", [ServerConfirms]),
{'noreply', State#state{confirms=ServerConfirms}};
handle_cast({'gen_listener',{'channel_flow', 'true'}}, State) ->
lager:debug("channel flow enabled"),
{'noreply', State#state{flow='true'}};
handle_cast({'gen_listener',{'channel_flow', 'false'}}, State) ->
lager:debug("channel flow disabled"),
{'noreply', State#state{flow='undefined'}};
handle_cast({'gen_listener',{'channel_flow_control', Active}}, State) ->
lager:debug("channel flow is ~p", [Active]),
{'noreply', State#state{flow=Active}};
handle_cast(_Msg, State) ->
lager:debug("unhandled cast: ~p", [_Msg]),
{'noreply', State, 'hibernate'}.
%%------------------------------------------------------------------------------
%% @doc Handling all non call/cast messages.
%% @end
%%------------------------------------------------------------------------------
-spec handle_info(any(), state()) -> kz_types:handle_info_ret_state(state()).
handle_info({'DOWN', ClientRef, 'process', _Pid, _Reason}
,#state{current_msg_id = _MsgId
,client_ref = ClientRef
,callid = CallId
}=State) ->
kz_util:put_callid(CallId),
lager:debug("requestor processes ~p died while waiting for msg id ~s", [_Pid, _MsgId]),
{'noreply', reset(State), 'hibernate'};
handle_info('timeout'
,#state{neg_resp=ErrorJObj
,neg_resp_count=Thresh
,neg_resp_threshold=Thresh
,client_from={_Pid, _}=From
,responses='undefined'
,defer_response=ReservedJObj
}=State) ->
case kz_term:is_empty(ReservedJObj) of
'true' ->
lager:debug("negative response threshold reached, returning last negative message to ~p", [_Pid]),
gen_server:reply(From, {'error', ErrorJObj});
'false' ->
lager:debug("negative response threshold reached, returning deferred response to ~p", [_Pid]),
gen_server:reply(From, {'ok', ReservedJObj})
end,
{'noreply', reset(State), 'hibernate'};
handle_info('timeout'
,#state{responses=Resps
,client_from=From
}=State) when is_list(Resps) ->
lager:debug("timeout reached, returning responses"),
gen_server:reply(From, {'error', Resps}),
{'noreply', reset(State), 'hibernate'};
handle_info('timeout', State) ->
{'noreply', State};
handle_info({'timeout', ReqRef, 'req_timeout'}
,#state{current_msg_id= _MsgId
,req_timeout_ref=ReqRef
,callid=CallId
,responses='undefined'
,client_from={_Pid, _}=From
,defer_response=ReservedJObj
}=State) ->
kz_util:put_callid(CallId),
case kz_term:is_empty(ReservedJObj) of
'true' ->
lager:debug("request timeout exceeded for msg id: ~s and client: ~p", [_MsgId, _Pid]),
gen_server:reply(From, {'error', 'timeout'});
'false' ->
lager:debug("only received deferred response for msg id: ~s and client: ~p", [_MsgId, _Pid]),
gen_server:reply(From, {'ok', ReservedJObj})
end,
{'noreply', reset(State), 'hibernate'};
handle_info({'timeout', ReqRef, 'req_timeout'}
,#state{responses=Resps
,req_timeout_ref=ReqRef
,client_from=From
,callid=CallId
}=State) ->
kz_util:put_callid(CallId),
lager:debug("req timeout for call_collect"),
gen_server:reply(From, {'timeout', Resps}),
{'noreply', reset(State), 'hibernate'};
handle_info({'timeout', ReqRef, 'confirm_timeout'}, #state{confirm_timeout_ref=ReqRef}=State) ->
handle_publish_timeout(State);
handle_info(_Info, State) ->
lager:debug("unhandled message: ~p", [_Info]),
{'noreply', State}.
%%------------------------------------------------------------------------------
%% @doc Allows listener to pass options to handlers.
%% @end
%%------------------------------------------------------------------------------
-spec handle_event(kz_json:object(), state()) -> gen_listener:handle_event_return().
handle_event(JObj, #state{client_from='relay'
,client_pid=Pid
}) ->
relay_event(Pid, JObj),
lager:debug("relayed event to ~p", [Pid]),
'ignore';
handle_event(JObj, State) ->
case handle_payload(kz_api:msg_id(JObj), JObj, State) of
{'noreply', NewState} -> {'ignore', NewState};
{'noreply', NewState, 'hibernate'} ->
gen_listener:cast(self(), 'hibernate'),
{'ignore', NewState}
end.
%%------------------------------------------------------------------------------
%% @doc This function is called by a `gen_server' when it is about to
%% terminate. It should be the opposite of `Module:init/1' and do any
%% necessary cleaning up. When it returns, the `gen_server' terminates
%% with Reason. The return value is ignored.
%%
%% @end
%%------------------------------------------------------------------------------
-spec terminate(any(), state()) -> 'ok'.
terminate(_Reason, _State) ->
lager:debug("amqp worker terminating: ~p", [_Reason]).
%%------------------------------------------------------------------------------
%% @doc Convert process state when code is changed.
%% @end
%%------------------------------------------------------------------------------
-spec code_change(any(), state(), any()) -> {'ok', state()}.
code_change(_OldVsn, State, _Extra) ->
{'ok', State}.
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc
%% @end
%%------------------------------------------------------------------------------
-spec reset(#state{}) -> #state{}.
reset(#state{req_timeout_ref = ReqRef
,client_ref = ClientRef
,confirm_timeout_ref = ConfirmRef
}=State) ->
kz_util:put_callid(?DEFAULT_LOG_SYSTEM_ID),
_ = case is_reference(ReqRef) of
'true' -> erlang:cancel_timer(ReqRef);
'false' -> 'ok'
end,
_ = case is_reference(ClientRef) of
'true' -> erlang:demonitor(ClientRef, ['flush']);
'false' -> 'ok'
end,
_ = case is_reference(ConfirmRef) of
'true' -> erlang:cancel_timer(ConfirmRef);
'false' -> 'ok'
end,
State#state{client_pid = 'undefined'
,client_ref = 'undefined'
,client_from = 'undefined'
,client_vfun = 'undefined'
,client_cfun = collect_until_timeout()
,neg_resp = 'undefined'
,neg_resp_count = 0
,current_msg_id = 'undefined'
,req_timeout_ref = 'undefined'
,req_start_time = 'undefined'
,callid = 'undefined'
,defer_response = 'undefined'
,responses = 'undefined'
,confirm_timeout_ref = 'undefined'
,confirm_start_time = 'undefined'
,method = 'undefined'
}.
-spec fudge_timeout(timeout()) -> timeout().
fudge_timeout('infinity'=T) -> T;
fudge_timeout(T) -> T + ?FUDGE.
-spec start_req_timeout(timeout()) -> reference().
start_req_timeout('infinity') -> make_ref();
start_req_timeout(Timeout) ->
erlang:start_timer(Timeout, self(), 'req_timeout').
-spec start_confirm_timeout(timeout()) -> reference().
start_confirm_timeout('infinity') -> make_ref();
start_confirm_timeout(Timeout) ->
erlang:start_timer(Timeout, self(), 'confirm_timeout').
-spec maybe_convert_to_proplist(kz_term:proplist() | kz_json:object()) -> kz_term:proplist().
maybe_convert_to_proplist(Req) ->
case kz_json:is_json_object(Req) of
'true' -> maybe_set_msg_id(kz_json:to_proplist(Req));
'false' -> maybe_set_msg_id(Req)
end.
-spec maybe_set_msg_id(kz_term:proplist()) -> kz_term:proplist().
maybe_set_msg_id(Props) ->
case kz_api:msg_id(Props) of
'undefined' ->
props:set_value(<<"Msg-ID">>, kz_binary:rand_hex(8), Props);
_MsgId ->
Props
end.
-spec publish_api(fun(), kz_term:api_terms()) -> 'ok' | {'error', any()}.
publish_api(PublishFun, ReqProps) ->
try PublishFun(ReqProps) of
'ok' -> 'ok';
{'error', _E}=Err -> Err;
Other ->
lager:error("publisher fun returned ~p instead of 'ok'", [Other]),
{'error', Other}
catch
'error':'badarg' ->
ST = erlang:get_stacktrace(),
lager:error("badarg error when publishing:"),
kz_util:log_stacktrace(ST),
{'error', 'badarg'};
'error':'function_clause' ->
ST = erlang:get_stacktrace(),
lager:error("function clause error when publishing:"),
kz_util:log_stacktrace(ST),
lager:error("pub fun: ~p", [PublishFun]),
{'error', 'function_clause'};
_E:R ->
lager:error("error when publishing: ~s:~p", [_E, R]),
{'error', R}
end.
-type relay_fun() :: fun((pid() | atom(), any()) -> any()).
-spec relay_event(pid(), kz_json:object()) -> any().
relay_event(Pid, JObj) ->
relay_event(Pid, JObj, fun erlang:send/2).
-spec relay_event(pid(), kz_json:object(), relay_fun()) -> any().
relay_event(Pid, JObj, RelayFun) ->
RelayFun(Pid, {'amqp_msg', JObj}).
-spec worker_pool(atom()) -> atom().
worker_pool(Pool) ->
put('$kz_amqp_worker_pool', Pool).
-spec worker_pool() -> atom().
worker_pool() ->
case get('$kz_amqp_worker_pool') of
'undefined' -> kz_amqp_sup:pool_name();
Pool -> Pool
end.
-spec handle_publish_timeout(state()) -> kz_types:handle_info_ret_state(state()).
handle_publish_timeout(#state{client_from='relay'
,reply_to=From
}=State) ->
lager:debug("timeout waiting for server confirmation"),
gen_server:reply(From, {'error', <<"timeout receiving server confirmation">>}),
{'noreply', State};
handle_publish_timeout(#state{client_from=From}=State) ->
lager:debug("timeout waiting for server confirmation"),
gen_server:reply(From, {'error', <<"timeout receiving server confirmation">>}),
{'noreply', reset(State), 'hibernate'}.
-spec handle_confirm(#'basic.ack'{} | #'basic.nack'{}, state()) -> kz_types:handle_cast_ret_state(state()).
handle_confirm(_Msg, #state{client_from='undefined'}=State) ->
lager:debug("confirm message was returned from the broker but it was too late : ~p",[_Msg]),
{'noreply', reset(State), 'hibernate'};
handle_confirm(Msg, #state{method=Method
,confirm_timeout_ref = ConfirmRef
}=State) ->
_ = erlang:cancel_timer(ConfirmRef),
handle_method_confirm(Method, Msg, State#state{confirm_timeout_ref = 'undefined'}).
-spec handle_method_confirm(atom(), #'basic.ack'{} | #'basic.nack'{}, state()) -> kz_types:handle_cast_ret_state(state()).
handle_method_confirm('publish', #'basic.ack'{}, #state{client_from='relay'
,reply_to=From
}=State) ->
lager:debug("published message was confirmed by the broker"),
gen_server:reply(From, 'ok'),
{'noreply', State};
handle_method_confirm('publish', #'basic.ack'{}, #state{client_from=From}=State) ->
lager:debug("published message was confirmed by the broker"),
gen_server:reply(From, 'ok'),
{'noreply', reset(State), 'hibernate'};
handle_method_confirm('publish', #'basic.nack'{}, #state{client_from='relay'
,reply_to=From
}=State) ->
lager:debug("published message was declined by the broker"),
gen_server:reply(From, {'error', <<"server declined message">>}),
{'noreply', State};
handle_method_confirm('publish', #'basic.nack'{}, #state{client_from=From}=State) ->
lager:debug("published message was declined by the broker"),
gen_server:reply(From, {'error', <<"server declined message">>}),
{'noreply', reset(State), 'hibernate'};
handle_method_confirm('request', #'basic.ack'{}, #state{client_pid=ClientPid
,current_msg_id = MsgId
,timeout=Timeout
}=State) ->
lager:debug("request for msg id ~s for ~p was confirmed by broker", [MsgId, ClientPid]),
NewState = State#state{req_timeout_ref = start_req_timeout(Timeout)
,req_start_time = os:timestamp()
},
{'noreply', NewState};
handle_method_confirm('request', #'basic.nack'{}, #state{client_pid=ClientPid
,current_msg_id = MsgId
,client_from=From
}=State) ->
lager:debug("request for msg id ~s for ~p was declined by broker", [MsgId, ClientPid]),
gen_server:reply(From, {'error', <<"server declined message">>}),
{'noreply', reset(State), 'hibernate'};
handle_method_confirm('call_collect', #'basic.ack'{}, #state{client_pid=ClientPid
,current_msg_id = MsgId
,timeout=Timeout
}=State) ->
lager:debug("call collect for msg id ~s for ~p was confirmed by broker", [MsgId, ClientPid]),
NewState = State#state{req_timeout_ref = start_req_timeout(Timeout)
,req_start_time = os:timestamp()
},
{'noreply', NewState};
handle_method_confirm('call_collect', #'basic.nack'{}, #state{client_pid=ClientPid
,current_msg_id = MsgId
,client_from=From
}=State) ->
lager:debug("call collect for msg id ~s for ~p was declined by broker", [MsgId, ClientPid]),
gen_server:reply(From, {'error', <<"server nack">>}),
{'noreply', reset(State), 'hibernate'}.
-spec handle_payload(kz_term:ne_binary(), kz_json:object(), state()) -> kz_types:handle_cast_ret_state(state()).
handle_payload(MsgId, JObj
,#state{current_msg_id = MsgId
,client_from = From
,client_vfun = VFun
,responses = 'undefined'
,req_start_time = StartTime
,neg_resp_count = NegCount
,neg_resp_threshold = NegThreshold
}=State) when NegCount < NegThreshold ->
_ = kz_util:put_callid(JObj),
case VFun(JObj) of
'true' ->
case kz_json:is_true(<<"Defer-Response">>, JObj) of
'false' ->
lager:debug("response for msg id ~s took ~b micro to return", [MsgId, timer:now_diff(os:timestamp(), StartTime)]),
gen_server:reply(From, {'ok', JObj}),
{'noreply', reset(State), 'hibernate'};
'true' ->
lager:debug("deferred response for msg id ~s, waiting for primary response", [MsgId]),
{'noreply', State#state{defer_response=JObj}, 'hibernate'}
end;
'false' ->
case kz_json:is_true(<<"Defer-Response">>, JObj) of
'true' ->
lager:debug("ignoring invalid resp as it was deferred"),
{'noreply', State};
'false' ->
lager:debug("response failed validator, waiting for more responses"),
{'noreply', State#state{neg_resp_count = NegCount + 1
,neg_resp=JObj
}}
end
end;
handle_payload(MsgId, JObj
,#state{current_msg_id = MsgId
,client_from = From
,client_cfun = UntilFun
,responses = Resps
,acc = Acc
,req_start_time = StartTime
}=State)
when is_list(Resps)
andalso is_function(UntilFun, 2) ->
_ = kz_util:put_callid(JObj),
lager:debug("recv message ~s", [MsgId]),
Responses = [JObj | Resps],
try UntilFun(Responses, Acc) of
'true' ->
lager:debug("responses have apparently met the criteria for the client, returning"),
lager:debug("response for msg id ~s took ~bμs to return"
,[MsgId, kz_time:elapsed_us(StartTime)]
),
gen_server:reply(From, {'ok', Responses}),
{'noreply', reset(State), 'hibernate'};
'false' ->
{'noreply', State#state{responses=Responses}, 'hibernate'};
{'false', Acc0} ->
{'noreply', State#state{responses=Responses, acc=Acc0}, 'hibernate'}
catch
_E:_R ->
lager:warning("supplied until_fun crashed: ~s: ~p", [_E, _R]),
lager:debug("pretending like until_fun returned false"),
{'noreply', State#state{responses=Responses}, 'hibernate'}
end;
handle_payload(MsgId, JObj
,#state{current_msg_id = MsgId
,client_from = From
,client_cfun = UntilFun
,responses = Resps
,req_start_time = StartTime
}=State) when is_list(Resps) ->
_ = kz_util:put_callid(JObj),
lager:debug("recv message ~s", [MsgId]),
Responses = [JObj | Resps],
try UntilFun(Responses) of
'true' ->
lager:debug("responses have apparently met the criteria for the client, returning"),
lager:debug("response for msg id ~s took ~bus to return"
,[MsgId, kz_time:elapsed_us(StartTime)]
),
gen_server:reply(From, {'ok', Responses}),
{'noreply', reset(State), 'hibernate'};
'false' ->
{'noreply', State#state{responses=Responses}, 'hibernate'}
catch
_E:_R ->
lager:warning("supplied until_fun crashed: ~s: ~p", [_E, _R]),
lager:debug("pretending like until_fun returned false"),
{'noreply', State#state{responses=Responses}, 'hibernate'}
end;
handle_payload(_MsgId, JObj
,#state{current_msg_id=_CurrMsgId}=State) ->
_ = kz_util:put_callid(JObj),
lager:debug("received unexpected message with old/expired message id: ~s, waiting for ~s", [_MsgId, _CurrMsgId]),
{'noreply', State}.