Skip to content

Commit 3bebb6e

Browse files
committed
Reconnect on send for tcp
If the listening socket is closed when :tcp_closed is received, then try to reconnect again on each send.
1 parent a7e1f8e commit 3bebb6e

File tree

2 files changed

+87
-17
lines changed

2 files changed

+87
-17
lines changed

lib/logger_logstash_backend.ex

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ defmodule LoggerLogstashBackend do
6363
end
6464

6565
def handle_info({:tcp_closed, socket}, state = %__MODULE__{host: host, port: port, socket: socket}) do
66-
with {:error, _} <- configure_socket(state, host, port) do
67-
{:ok, state}
66+
with {:error, reason} <- configure_socket(state, host, port) do
67+
{:ok, %{state | socket: nil}}
6868
end
6969
end
7070

@@ -103,15 +103,7 @@ defmodule LoggerLogstashBackend do
103103
fields: fields
104104
}
105105

106-
with {:error, reason} <- Socket.send(state, [json, "\n"]) do
107-
# fallback in case TCP configuration is bad
108-
IO.puts :stderr,
109-
"Could not log message (#{json}) to socket (#{url(state)}) due to #{inspect reason}. " <>
110-
"Check that state (#{inspect state}) is correct."
111-
end
112-
113-
114-
{:ok, state}
106+
send_with_retry(state, json)
115107
end
116108

117109
defp configure(name, opts) when is_atom(name) and is_list(opts) do
@@ -131,15 +123,22 @@ defmodule LoggerLogstashBackend do
131123
configure_socket(state, host, port)
132124
end
133125

134-
defp configure_socket(state, host, port) when is_nil(host) or is_nil(port), do: {:ok, state}
135-
defp configure_socket(state, host, port) do
126+
defp configure_socket(state = %__MODULE__{host: host, port: port}) do
136127
with reply = {:error, reason} <- Socket.open %{state | host: to_char_list(host), port: port} do
137128
IO.puts :stderr, "Could not open socket (#{url(state)}) due to reason #{inspect reason}"
138129

139130
reply
140131
end
141132
end
142133

134+
defp configure_socket(state, host, port) when is_nil(host) or is_nil(port), do: {:ok, state}
135+
defp configure_socket(state, host, port), do: configure_socket %{state | host: to_char_list(host), port: port}
136+
137+
defp fallback_log(state, json, reason) do
138+
IO.puts :stderr,
139+
"Could not log message (#{json}) to socket (#{url(state)}) due to #{inspect reason}. " <>
140+
"Check that state (#{inspect state}) is correct."
141+
end
143142

144143
# inspects the argument only if it is a pid
145144
defp inspect_pid(pid) when is_pid(pid), do: inspect(pid)
@@ -152,6 +151,31 @@ defmodule LoggerLogstashBackend do
152151
end
153152
end
154153

154+
defp send_with_retry(state = %__MODULE__{socket: nil}, json), do: send_with_new_socket(state, json)
155+
defp send_with_retry(state, json) do
156+
case Socket.send(state, [json, "\n"]) do
157+
:ok ->
158+
{:ok, state}
159+
{:error, :closed} ->
160+
send_with_new_socket(state, json)
161+
{:error, reason} ->
162+
fallback_log(state, json, reason)
163+
164+
{:ok, state}
165+
end
166+
end
167+
168+
defp send_with_new_socket(state, json) do
169+
case configure_socket(state) do
170+
{:error, reason} ->
171+
fallback_log(state, json, reason)
172+
173+
{:ok, state}
174+
{:ok, new_state} ->
175+
send_with_retry(new_state, json)
176+
end
177+
end
178+
155179
defp url(%__MODULE__{host: host, port: port, protocol: protocol}) do
156180
"#{protocol}://#{host}:#{port}"
157181
end

test/logger_logstash_backend/tcp_test.exs

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,21 @@ defmodule LoggerLogstashBackend.TCPTest do
1919
use Timex
2020

2121
import ExUnit.CaptureIO
22+
import ExUnit.CaptureLog
23+
24+
# Functions
25+
26+
def accept(listen_socket), do: :gen_tcp.accept(listen_socket, 1_000)
27+
28+
def listen(port \\ 0) do
29+
:gen_tcp.listen port, [:binary, {:active, true}, {:ip, {127, 0, 0, 1}}, {:reuseaddr, true}]
30+
end
2231

2332
# Callbacks
2433

2534
setup context = %{line: line} do
2635
# have to open socket before configure_backend, so that it is listening when connect happens
27-
{:ok, listen_socket} = :gen_tcp.listen 0, [:binary, {:active, true}, {:ip, {127, 0, 0, 1}}, {:reuseaddr, true}]
36+
{:ok, listen_socket} = listen
2837
{:ok, port} = :inet.port(listen_socket)
2938

3039
backend = {LoggerLogstashBackend, String.to_atom("#{inspect __MODULE__}#{line}")}
@@ -41,7 +50,7 @@ defmodule LoggerLogstashBackend.TCPTest do
4150
protocol: :tcp
4251
]
4352

44-
{:ok, accept_socket} = :gen_tcp.accept(listen_socket, 1_000)
53+
{:ok, accept_socket} = accept(listen_socket)
4554

4655
on_exit fn ->
4756
Logger.remove_backend backend
@@ -52,6 +61,7 @@ defmodule LoggerLogstashBackend.TCPTest do
5261
full_context = context
5362
|> Map.put(:accept_socket, accept_socket)
5463
|> Map.put(:listen_socket, listen_socket)
64+
|> Map.put(:port, port)
5565

5666
{:ok, full_context}
5767
end
@@ -68,7 +78,7 @@ defmodule LoggerLogstashBackend.TCPTest do
6878
assert fields["function"] == "test can log/1"
6979
assert fields["key1"] == "field1"
7080
assert fields["level"] == "info"
71-
assert fields["line"] == 60
81+
assert fields["line"] == 70
7282
assert fields["module"] == to_string(__MODULE__)
7383
assert fields["pid"] == inspect(self)
7484
assert fields["some_metadata"] == "go here"
@@ -92,7 +102,7 @@ defmodule LoggerLogstashBackend.TCPTest do
92102
assert fields["function"] == "test can log pids/1"
93103
assert fields["pid_key"] == inspect(self)
94104
assert fields["level"] == "info"
95-
assert fields["line"] == 84
105+
assert fields["line"] == 94
96106
assert fields["module"] == to_string(__MODULE__)
97107
assert fields["pid"] == inspect(self)
98108
assert fields["some_metadata"] == "go here"
@@ -115,6 +125,42 @@ defmodule LoggerLogstashBackend.TCPTest do
115125
assert {:ok, _} = :gen_tcp.accept(listen_socket, 1_000)
116126
end
117127

128+
test "it reconnects on send if disconnected and listening socket is closed " <>
129+
"when handle_info({:tcp_closed, _}, _) is called",
130+
%{accept_socket: accept_socket, listen_socket: listen_socket, port: port} do
131+
:ok = :gen_tcp.close listen_socket
132+
:ok = :gen_tcp.close accept_socket
133+
134+
Logger.info "Can't connect"
135+
:timer.sleep 100
136+
137+
{:ok, new_listen_socket} = listen(port)
138+
139+
# Need to accept and wait in a separate process because connect won't come until the Logger.info call tries to
140+
# establish a new socket
141+
pid = self
142+
spawn_link fn ->
143+
{:ok, new_accept_socket} = accept(new_listen_socket)
144+
145+
send pid, {:new_accept_socket, new_accept_socket}
146+
147+
receive do
148+
# forward to pid so that get_log! works as normal
149+
message = {:tcp, _, _} -> send pid, message
150+
end
151+
end
152+
153+
Logger.info "I reconnected"
154+
155+
assert_receive {:new_accept_socket, _}
156+
157+
:timer.sleep 100
158+
159+
{:ok, data} = JSX.decode get_log!
160+
161+
assert data["message"] == "I reconnected"
162+
end
163+
118164
test "if it can't reconnect, then is prints to :stderr",
119165
%{accept_socket: accept_socket, listen_socket: listen_socket} do
120166
:ok = :gen_tcp.close listen_socket

0 commit comments

Comments
 (0)