Skip to content

Commit 0d2f530

Browse files
committed
pool: fix race conditions
there was a possible race codition when creating/destroying a connection ID. This changes simply cache a hash of the connection to make it more consistent. We keep the number of items in the cache to 1000. * changes - refactor hackney_connections to be a simple LRU cache - fix hackney_connection and hackney_pool modules
1 parent 748f9ec commit 0d2f530

File tree

4 files changed

+108
-53
lines changed

4 files changed

+108
-53
lines changed

rebar.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{"1.1.0",
22
[{<<"certifi">>,{pkg,<<"certifi">>,<<"2.5.2">>},0},
33
{<<"idna">>,{pkg,<<"idna">>,<<"6.0.1">>},0},
4+
{<<"lru">>,{pkg,<<"lru">>,<<"2.4.0">>},0},
45
{<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},0},
56
{<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.2.0">>},0},
67
{<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.0">>},0},
@@ -10,6 +11,7 @@
1011
{pkg_hash,[
1112
{<<"certifi">>, <<"B7CFEAE9D2ED395695DD8201C57A2D019C0C43ECAF8B8BCB9320B40D6662F340">>},
1213
{<<"idna">>, <<"1D038FB2E7668CE41FBF681D2C45902E52B3CB9E9C77B55334353B222C2EE50C">>},
14+
{<<"lru">>, <<"A8F9967CA9B6F260BAA19E2EFB2AEB3853A3F5BD5F8416F537A672294B38C1BC">>},
1315
{<<"metrics">>, <<"25F094DEA2CDA98213CECC3AEFF09E940299D950904393B2A29D191C346A8486">>},
1416
{<<"mimerl">>, <<"67E2D3F571088D5CFD3E550C383094B47159F3EEE8FFA08E64106CDF5E981BE3">>},
1517
{<<"parse_trans">>, <<"09765507A3C7590A784615CFD421D101AEC25098D50B89D7AA1D66646BC571C1">>},

src/hackney_connection.erl

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33

44
-export([new/1,
5-
release/1,
65
get_property/2,
76
is_ssl/1]).
87

@@ -27,13 +26,10 @@ new(#client{transport=Transport,
2726
Host1 = string_compat:to_lower(Host0),
2827
ConnectOptions = connect_options(Transport, Host1, ClientOptions),
2928
Tunnel = maybe_tunnel(Transport),
30-
Id = hackney_connections:new_connection_id(Transport, Host1, Port, ConnectOptions),
29+
Id = new_connection_id(Transport, Host1, Port, ConnectOptions),
3130
Connection = new_connection_r(Transport, Host1, Port, Id, Tunnel),
3231
{Connection, ConnectOptions}.
3332

34-
release(#connection{id=Id}) ->
35-
hackney_connections:release_connection_id(Id).
36-
3733
get_property(transport, #connection{transport=Transport}) -> Transport;
3834
get_property(host, #connection{host=Host}) -> Host;
3935
get_property(port, #connection{port=Port}) -> Port;
@@ -45,7 +41,6 @@ is_ssl(#connection{transport=hackney_ssl}) -> true;
4541
is_ssl(#connection{}) -> false;
4642
is_ssl(_) -> erlang:error(badarg).
4743

48-
4944
controlling_process(#connection{transport=Transport}, Socket, Owner) ->
5045
Transport:controlling_process(Socket, Owner).
5146

@@ -77,6 +72,18 @@ close(#connection{transport=Transport}, Socket) ->
7772
Transport:close(Socket).
7873

7974

75+
new_connection_id(Transport, Host, Port, ConnectionOptions) ->
76+
Key = {Transport, Host, Port, ConnectionOptions},
77+
case hackney_connections:lookup(Key) of
78+
{ok, Id} ->
79+
Id;
80+
error ->
81+
Id = erlang:phash2(Key),
82+
hackney_connections:insert(Key, Id),
83+
Id
84+
end.
85+
86+
8087
new_connection_r(Transport, Host, Port, Id, Tunnel) ->
8188
#connection{transport=Transport,
8289
host=Host,

src/hackney_connections.erl

Lines changed: 93 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
-module(hackney_connections).
22
-behaviour(gen_server).
33

4+
%% API
5+
-export([
6+
insert/2,
7+
lookup/1,
8+
delete/1,
9+
get_num_entries/0
10+
]).
411

5-
-export([new_connection_id/4,
6-
release_connection_id/1]).
712

813
-export([start_link/0]).
914

@@ -19,60 +24,69 @@
1924

2025
-define(SERVER, ?MODULE).
2126

27+
-record(state, {tab,lru}).
28+
-define(MAX_ITEMS, 1000).
2229

23-
new_connection_id(Transport, Host, Port, ConnectOptions) ->
24-
ConnectionKey = {Transport, Host, Port, ConnectOptions},
25-
gen_server:call(?SERVER, {new_session_id, ConnectionKey}).
2630

31+
insert(Key, Id) ->
32+
gen_server:call(?SERVER, {insert, Key, Id}).
2733

28-
release_connection_id(Id) ->
29-
gen_server:cast(?SERVER, {release_connection_id, Id}).
34+
lookup(Key) ->
35+
gen_server:call(?SERVER, {lookup, Key}).
3036

37+
delete(Key) ->
38+
gen_server:call(?SERVER, {delete, Key}).
3139

40+
get_num_entries() ->
41+
gen_server:call(?SERVER, num_entries).
3242

3343
start_link() ->
3444
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
3545

3646

3747
init([]) ->
38-
_ = ets:new(?CONNECTIONS, [set, named_table, public,
39-
{read_concurrency, true},
40-
{write_concurrency, true}]),
41-
{ok, {}}.
48+
Tab = ets:new(?SERVER, [set, {read_concurrency, true}]),
49+
{ok, #state{tab=Tab, lru=new_lrulist()}}.
4250

4351

44-
handle_call({new_session_id, ConnectionKey}, _From, State) ->
45-
case ets:lookup(?CONNECTIONS, ConnectionKey) of
46-
[{_, Id}] ->
47-
ets:update_counter(?CONNECTIONS, Id, {3, 1}),
48-
{reply, Id, State};
52+
53+
handle_call({insert, Key, Id}, From, #state{tab=Tab, lru=Lru} = State) ->
54+
true = ets:insert(Tab, {Key, Id}),
55+
gen_server:reply(From, true),
56+
Lru2 = push(Key, Lru),
57+
NewState = case has_exceeded(State) of
58+
true ->
59+
evict(State#state{lru=Lru2});
60+
false ->
61+
State#state{lru=Lru2 }
62+
end,
63+
{noreply, NewState};
64+
65+
66+
handle_call({lookup, Key}, From, State = #state{tab=Tab, lru=Lru}) ->
67+
case ets:lookup(Tab, Key) of
68+
[{Key, Id}] ->
69+
gen_server:reply(From, {ok, Id}),
70+
Lru2 = touch(Key, Lru),
71+
{noreply, State#state{lru=Lru2}};
4972
[] ->
50-
Id = make_ref(),
51-
ets:insert(?CONNECTIONS, [{ConnectionKey, Id},
52-
{Id, ConnectionKey, 1}]),
53-
{reply, Id, State}
73+
{reply, error, State}
5474
end;
55-
handle_call(_Msg, _From, State) ->
56-
{reply, badarg, State}.
5775

76+
handle_call({delete, Key}, From, State = #state{tab=Tab, lru=Lru}) ->
77+
_ = ets:delete(Tab, Key),
78+
gen_server:reply(From, ok),
79+
Lru2 = drop(Key, Lru),
80+
{reply, true, State#state{lru=Lru2}};
81+
82+
83+
handle_call(num_entries, _From, State = #state{tab=Tab}) ->
84+
{reply, ets:info(Tab, size) , State};
85+
86+
87+
handle_call(_Msg, _From, State) ->
88+
{reply, {error, bad_call}, State}.
5889

59-
handle_cast({release_connection_id, Id}, State) ->
60-
RefCount = try ets:update_counter(?CONNECTIONS, Id, {3, -1})
61-
catch
62-
error:badarg -> -1
63-
end,
64-
if
65-
RefCount =< 0 ->
66-
case ets:take(?CONNECTIONS, Id) of
67-
[{_, ConnKey, _}] ->
68-
ets:delete(?CONNECTIONS, ConnKey);
69-
[] ->
70-
ok
71-
end;
72-
true ->
73-
ok
74-
end,
75-
{noreply, State};
7690
handle_cast(_Msg, State) ->
7791
{noreply, State}.
7892

@@ -88,3 +102,43 @@ code_change(_OldVsn, State, _Extra) ->
88102
terminate(_Reason, _State) ->
89103
ok.
90104

105+
106+
107+
has_exceeded(#state{ tab = Tab}) ->
108+
(ets:info(Tab, size) > ?MAX_ITEMS).
109+
110+
evict(State = #state{tab=Tab, lru=Lru}) ->
111+
case pop(Lru) of
112+
{ok, Key, Lru2} ->
113+
_ = ets:delete(Tab, Key),
114+
State#state{lru=Lru2};
115+
empty ->
116+
State
117+
end.
118+
119+
120+
%% ========
121+
%% LRU LIST
122+
123+
124+
new_lrulist() -> [].
125+
126+
push(LruElement, LruList) ->
127+
case lists:member(LruElement, LruList) of
128+
true -> LruList;
129+
false -> LruList ++ [LruElement]
130+
end.
131+
132+
pop([]) -> empty;
133+
pop([ El | Rest] ) -> {ok, El, Rest}.
134+
135+
touch(LruElement, [LruElement|Rest]) ->
136+
Rest ++ [LruElement];
137+
touch(LruElement, [H|Rest]) ->
138+
[H|touch(LruElement, Rest)];
139+
touch(LruElement, []) ->
140+
[LruElement].
141+
142+
drop(LruElement, LruList) ->
143+
lists:delete(LruElement, LruList).
144+

src/hackney_pool.erl

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,28 +91,23 @@ checkout(Host, _Port, Transport, #client{options=Opts,
9191

9292
{ok, {PoolName, RequestRef, Connection, Owner, Transport}, Socket};
9393
{error, timeout} ->
94-
hackney_connections:release_connection_id(Connection),
9594
_ = metrics:increment_counter(Metrics, [hackney, Host, connect_timeout]),
9695
{error, timeout};
9796
Error ->
9897
?report_trace("connect error", []),
99-
hackney_connections:release_connection_id(Connection),
10098
_ = metrics:increment_counter(Metrics, [hackney, Host, connect_error]),
10199
Error
102100
end;
103101
{error, Reason} ->
104-
hackney_connections:release_connection_id(Connection),
105102
{error, Reason};
106103
{'EXIT', {timeout, _}} ->
107-
hackney_connections:release_connection_id(Connection),
108104
% socket will still checkout so to avoid deadlock we send in a cancellation
109105
gen_server:cast(Pool, {checkout_cancel, Connection, RequestRef}),
110106
{error, checkout_timeout}
111107
end.
112108

113109
%% @doc release a socket in the pool
114110
checkin({_Name, Ref, Connection, Owner, Transport}, Socket) ->
115-
hackney_connection:release(Connection),
116111
hackney_connection:setopts(Connection, Socket, [{active, false}]),
117112
case hackney_connection:sync_socket(Connection, Socket) of
118113
true ->
@@ -401,7 +396,6 @@ dequeue(Dest, Ref, State) ->
401396
empty ->
402397
State#state{clients = Clients2};
403398
{ok, {From, Ref2}, Queues2} ->
404-
hackney_connections:release_connection_id(Dest),
405399
Pending2 = del_pending(Ref, Pending),
406400
_ = metrics:update_histogram(
407401
State#state.metrics, [hackney_pool, State#state.name, queue_count], dict:size(Pending2)
@@ -451,7 +445,6 @@ remove_socket(Socket, #state{connections=Conns, sockets=Sockets}=State) ->
451445
{ok, {Connection, Timer}} ->
452446
cancel_timer(Socket, Timer),
453447
catch hackney_connection:close(Connection, Socket),
454-
hackney_connections:release_connection_id(Connection),
455448
ConnSockets = lists:delete(Socket, dict:fetch(Connection, Conns)),
456449
NewConns = update_connections(ConnSockets, Connection, Conns),
457450
NewSockets = dict:erase(Socket, Sockets),
@@ -562,7 +555,6 @@ deliver_socket(Socket, Connection, State) ->
562555
gen_server:reply(FromWaiter, {ok, Socket, self()}),
563556
monitor_client(Connection, Ref, State#state{queues = Queues2, pending=Pending2});
564557
_Error ->
565-
hackney_connection:release(Connection),
566558
% Something wrong, close the socket
567559
_ = (catch hackney_connection:close(Connection, Socket)),
568560
%% and let the waiter connect to a new one

0 commit comments

Comments
 (0)