From 36cdc524415bbedc5613960a314ed4d8a5f845ae Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Sun, 19 Jun 2016 16:11:51 -0500 Subject: [PATCH 1/8] Allow tcp or udp sockets --- lib/logger_logstash_backend.ex | 99 ++++++++++++--- lib/logger_logstash_backend/socket.ex | 50 ++++++++ test/logger_logstash_backend/tcp_test.exs | 118 ++++++++++++++++++ .../udp_test.exs} | 67 +++++----- 4 files changed, 280 insertions(+), 54 deletions(-) create mode 100644 lib/logger_logstash_backend/socket.ex create mode 100644 test/logger_logstash_backend/tcp_test.exs rename test/{logger_logstash_backend_test.exs => logger_logstash_backend/udp_test.exs} (67%) diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index 2c4106f..e02fe2b 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -14,15 +14,42 @@ # limitations under the License. ################################################################################ defmodule LoggerLogstashBackend do + alias LoggerLogstashBackend.Socket + use GenEvent use Timex + # Struct + + defstruct ~w( + host + level + metadata + name + port + protocol + socket + type + )a + + # Functions + + ## GenEvent callbacks + def init({__MODULE__, name}) do - {:ok, configure(name, [])} + # trap exits, so that cleanup can occur in terminate/2 + Process.flag(:trap_exit, true) + + configure(name, []) end def handle_call({:configure, opts}, %{name: name}) do - {:ok, :ok, configure(name, opts)} + case configure(name, opts) do + {:ok, state} -> + {:ok, :ok, state} + reply = {:error, _reason} -> + {:remove_handler, reply} + end end def handle_event( @@ -30,17 +57,26 @@ defmodule LoggerLogstashBackend do ) do if is_nil(min_level) or Logger.compare_levels(level, min_level) != :lt do log_event level, msg, ts, md, state + else + {:ok, state} end - {:ok, state} end + @doc """ + Closes socket when the backend is removed + """ + def terminate(_, state) do + :ok = Socket.close(state) + + state + end + + ## Private Functions + defp log_event( - level, msg, ts, md, %{ - host: host, - port: port, + level, msg, ts, md, state = %{ type: type, - metadata: metadata, - socket: socket + metadata: metadata } ) do fields = md @@ -56,10 +92,19 @@ defmodule LoggerLogstashBackend do message: to_string(msg), fields: fields } - :gen_udp.send socket, host, port, to_char_list(json) + + with {:error, reason} <- Socket.send(state, [json, "\n"]) do + # fallback in case TCP configuration is bad + IO.puts :stderr, + "Could not log message (#{json}) to socket (#{url(state)}) due to #{inspect reason}. " <> + "Check that state (#{inspect state}) is correct." + end + + + {:ok, state} end - defp configure(name, opts) do + defp configure(name, opts) when is_atom(name) and is_list(opts) do env = Application.get_env :logger, name, [] opts = Keyword.merge env, opts Application.put_env :logger, name, opts @@ -69,18 +114,28 @@ defmodule LoggerLogstashBackend do type = Keyword.get opts, :type, "elixir" host = Keyword.get opts, :host port = Keyword.get opts, :port - {:ok, socket} = :gen_udp.open 0 - %{ - name: name, - host: to_char_list(host), - port: port, - level: level, - socket: socket, - type: type, - metadata: metadata - } + protocol = Keyword.get opts, :protocol, :udp + + state = %__MODULE__{level: level, metadata: metadata, name: name, protocol: protocol, type: type} + + configure_socket(state, host, port) end + defp configure_socket(state = %__MODULE__{host: host, port: port}) do + case Socket.open %{state | host: to_char_list(host), port: port} do + reply = {:error, reason} -> + IO.puts :stderr, "Could not open socket (#{url(state)}) due to reason #{inspect reason}" + + reply + other -> + other + end + end + + defp configure_socket(state, host, port) when is_nil(host) or is_nil(port), do: {:ok, state} + defp configure_socket(state, host, port), do: configure_socket %{state | host: to_char_list(host), port: port} + + # inspects the argument only if it is a pid defp inspect_pid(pid) when is_pid(pid), do: inspect(pid) defp inspect_pid(other), do: other @@ -91,4 +146,8 @@ defmodule LoggerLogstashBackend do {key, inspect_pid(value)} end end + + defp url(%__MODULE__{host: host, port: port, protocol: protocol}) do + "#{protocol}://#{host}:#{port}" + end end diff --git a/lib/logger_logstash_backend/socket.ex b/lib/logger_logstash_backend/socket.ex new file mode 100644 index 0000000..4230030 --- /dev/null +++ b/lib/logger_logstash_backend/socket.ex @@ -0,0 +1,50 @@ +defmodule LoggerLogstashBackend.Socket do + @moduledoc """ + Hides differences between `:udp` and `:tcp` sockets, so that protocol can be chosen in configuration + """ + + # Types + + @type protocol :: :tcp | :udp + + # Functions + + @doc """ + Closes the `protocol` `socket` + """ + @spec close(map) :: :ok + + def close(%{socket: socket}), do: :inet.close(socket) + + @doc """ + Opens a `protocol` socket using [`:gen_tcp.connect/3`](http://erlang.org/doc/man/gen_tcp.html#connect-3) or + [`:gen_udp.open/1`](http://erlang.org/doc/man/gen_udp.html#open-1). + """ + @spec open(map) :: {:ok, map} | {:error, :inet.posix} + + def open(map = %{host: host, port: port, protocol: :tcp}) do + host + |> :gen_tcp.connect(port, [{:active, true}, :binary, {:keepalive, true}]) + |> put_socket(map) + end + + def open(map = %{protocol: :udp}) do + 0 + |> :gen_udp.open + |> put_socket(map) + end + + @doc """ + Sends a `protocol` message over `socket`. + """ + @spec send(map, iodata) :: :ok | {:error, :closed | :not_owner | :inet.posix} + def send(%{protocol: :tcp, socket: socket}, packet), do: :gen_tcp.send(socket, packet) + def send(%{host: host, port: port, protocol: :udp, socket: socket}, packet) do + :gen_udp.send(socket, host, port, packet) + end + + ## Private Functions + + defp put_socket({:ok, socket}, map), do: {:ok, Map.put(map, :socket, socket)} + defp put_socket(other, _), do: other +end diff --git a/test/logger_logstash_backend/tcp_test.exs b/test/logger_logstash_backend/tcp_test.exs new file mode 100644 index 0000000..6797040 --- /dev/null +++ b/test/logger_logstash_backend/tcp_test.exs @@ -0,0 +1,118 @@ +################################################################################ +# Copyright 2015 Marcelo Gornstein +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +defmodule LoggerLogstashBackend.TCPTest do + use ExUnit.Case, async: false + require Logger + use Timex + + @backend {LoggerLogstashBackend, :tcp_test} + + setup context = %{line: line} do + # have to open socket before configure_backend, so that it is listening when connect happens + {:ok, listen_socket} = :gen_tcp.listen 0, [:binary, {:active, true}, {:ip, {127, 0, 0, 1}}, {:reuseaddr, true}] + {:ok, port} = :inet.port(listen_socket) + + backend = {LoggerLogstashBackend, String.to_atom("#{inspect __MODULE__}#{line}")} + + Logger.add_backend backend + Logger.configure_backend backend, [ + host: "127.0.0.1", + port: port, + level: :info, + type: "some_app", + metadata: [ + some_metadata: "go here" + ], + protocol: :tcp + ] + + {:ok, accept_socket} = :gen_tcp.accept(listen_socket, 1_000) + + on_exit fn -> + Logger.remove_backend backend + :ok = :gen_tcp.close accept_socket + :ok = :gen_tcp.close listen_socket + end + + {:ok, context} + end + + test "can log" do + Logger.info "hello world", [key1: "field1"] + + assert {:ok, data} = JSX.decode get_log! + assert data["type"] === "some_app" + assert data["message"] === "hello world" + + fields = data["fields"] + + assert fields["function"] == "test can log/1" + assert fields["key1"] == "field1" + assert fields["level"] == "info" + assert fields["line"] == 54 + assert fields["module"] == to_string(__MODULE__) + assert fields["pid"] == inspect(self) + assert fields["some_metadata"] == "go here" + + {:ok, ts} = Timex.parse data["@timestamp"], "%FT%T%z", :strftime + ts = Timex.to_unix ts + + now = Timex.to_unix Timex.DateTime.local + assert (now - ts) < 1000 + end + + test "can log pids" do + Logger.info "pid", [pid_key: self] + + {:ok, data} = JSX.decode get_log! + assert data["type"] === "some_app" + assert data["message"] === "pid" + + fields = data["fields"] + + assert fields["function"] == "test can log pids/1" + assert fields["pid_key"] == inspect(self) + assert fields["level"] == "info" + assert fields["line"] == 78 + assert fields["module"] == to_string(__MODULE__) + assert fields["pid"] == inspect(self) + assert fields["some_metadata"] == "go here" + + {:ok, ts} = Timex.parse data["@timestamp"], "%FT%T%z", :strftime + ts = Timex.to_unix ts + + now = Timex.to_unix Timex.DateTime.local + assert (now - ts) < 1000 + end + + test "cant log when minor levels" do + Logger.debug "hello world", [key1: "field1"] + {:error, :nothing_received} = get_log + end + + defp get_log do + receive do + {:tcp, _socket, json} -> {:ok, json} + after 500 -> {:error, :nothing_received} + end + end + + defp get_log! do + {:ok, log} = get_log + + log + end +end diff --git a/test/logger_logstash_backend_test.exs b/test/logger_logstash_backend/udp_test.exs similarity index 67% rename from test/logger_logstash_backend_test.exs rename to test/logger_logstash_backend/udp_test.exs index c40e60a..ba72089 100644 --- a/test/logger_logstash_backend_test.exs +++ b/test/logger_logstash_backend/udp_test.exs @@ -13,15 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -defmodule LoggerLogstashBackendTest do +defmodule LoggerLogstashBackend.UDPTest do use ExUnit.Case, async: false require Logger use Timex - @backend {LoggerLogstashBackend, :test} - Logger.add_backend @backend + @backend {LoggerLogstashBackend, :udp_test} setup do + Logger.add_backend @backend Logger.configure_backend @backend, [ host: "127.0.0.1", port: 10001, @@ -32,28 +32,32 @@ defmodule LoggerLogstashBackendTest do ] ] {:ok, socket} = :gen_udp.open 10001, [:binary, {:active, true}] + on_exit fn -> + Logger.remove_backend @backend :ok = :gen_udp.close socket end + :ok end test "can log" do Logger.info "hello world", [key1: "field1"] - json = get_log - {:ok, data} = JSX.decode json + + assert {:ok, data} = JSX.decode get_log assert data["type"] === "some_app" assert data["message"] === "hello world" - expected = %{ - "function" => "test can log/1", - "level" => "info", - "module" => "Elixir.LoggerLogstashBackendTest", - "pid" => (inspect self), - "some_metadata" => "go here", - "line" => 42, - "key1" => "field1" - } - assert contains?(data["fields"], expected) + + fields = data["fields"] + + assert fields["function"] == "test can log/1" + assert fields["key1"] == "field1" + assert fields["level"] == "info" + assert fields["line"] == 45 + assert fields["module"] == to_string(__MODULE__) + assert fields["pid"] == inspect(self) + assert fields["some_metadata"] == "go here" + {:ok, ts} = Timex.parse data["@timestamp"], "%FT%T%z", :strftime ts = Timex.to_unix ts @@ -63,20 +67,21 @@ defmodule LoggerLogstashBackendTest do test "can log pids" do Logger.info "pid", [pid_key: self] - json = get_log - {:ok, data} = JSX.decode json + + {:ok, data} = JSX.decode get_log assert data["type"] === "some_app" assert data["message"] === "pid" - expected = %{ - "function" => "test can log pids/1", - "level" => "info", - "module" => "Elixir.LoggerLogstashBackendTest", - "pid" => (inspect self), - "pid_key" => inspect(self), - "some_metadata" => "go here", - "line" => 65 - } - assert contains?(data["fields"], expected) + + fields = data["fields"] + + assert fields["function"] == "test can log pids/1" + assert fields["pid_key"] == inspect(self) + assert fields["level"] == "info" + assert fields["line"] == 69 + assert fields["module"] == to_string(__MODULE__) + assert fields["pid"] == inspect(self) + assert fields["some_metadata"] == "go here" + {:ok, ts} = Timex.parse data["@timestamp"], "%FT%T%z", :strftime ts = Timex.to_unix ts @@ -91,14 +96,8 @@ defmodule LoggerLogstashBackendTest do defp get_log do receive do - {:udp, _, _, _, json} -> json + {:udp, _socket, _from_ip, _from_port, json} -> json after 500 -> :nothing_received end end - - defp contains?(map1, map2) do - Enum.all?(Map.to_list(map2), fn {key, value} -> - Map.fetch!(map1, key) == value - end) - end end From 53ea15be431a95092d755cae456aa01a759172b5 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Mon, 20 Jun 2016 15:46:48 -0500 Subject: [PATCH 2/8] Reconnect if accept socket is closed --- lib/logger_logstash_backend.ex | 10 ++++++++++ test/logger_logstash_backend/tcp_test.exs | 16 +++++++++++++--- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index e02fe2b..166e07f 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -62,6 +62,16 @@ defmodule LoggerLogstashBackend do end end + def handle_info({:tcp_closed, socket}, state = %__MODULE__{host: host, port: port, socket: socket}) do + with {:error, _} <- configure_socket(state, host, port) do + {:ok, state} + end + end + + def handle_info(_, state) do + {:ok, state} + end + @doc """ Closes socket when the backend is removed """ diff --git a/test/logger_logstash_backend/tcp_test.exs b/test/logger_logstash_backend/tcp_test.exs index 6797040..82107d4 100644 --- a/test/logger_logstash_backend/tcp_test.exs +++ b/test/logger_logstash_backend/tcp_test.exs @@ -47,7 +47,11 @@ defmodule LoggerLogstashBackend.TCPTest do :ok = :gen_tcp.close listen_socket end - {:ok, context} + full_context = context + |> Map.put(:accept_socket, accept_socket) + |> Map.put(:listen_socket, listen_socket) + + {:ok, full_context} end test "can log" do @@ -62,7 +66,7 @@ defmodule LoggerLogstashBackend.TCPTest do assert fields["function"] == "test can log/1" assert fields["key1"] == "field1" assert fields["level"] == "info" - assert fields["line"] == 54 + assert fields["line"] == 58 assert fields["module"] == to_string(__MODULE__) assert fields["pid"] == inspect(self) assert fields["some_metadata"] == "go here" @@ -86,7 +90,7 @@ defmodule LoggerLogstashBackend.TCPTest do assert fields["function"] == "test can log pids/1" assert fields["pid_key"] == inspect(self) assert fields["level"] == "info" - assert fields["line"] == 78 + assert fields["line"] == 82 assert fields["module"] == to_string(__MODULE__) assert fields["pid"] == inspect(self) assert fields["some_metadata"] == "go here" @@ -103,6 +107,12 @@ defmodule LoggerLogstashBackend.TCPTest do {:error, :nothing_received} = get_log end + test "it reconnects if disconnected", %{accept_socket: accept_socket, listen_socket: listen_socket} do + :ok = :gen_tcp.close accept_socket + + assert {:ok, _} = :gen_tcp.accept(listen_socket, 1_000) + end + defp get_log do receive do {:tcp, _socket, json} -> {:ok, json} From 5030d8749ad705804798100e05e73ff6afdb57b4 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Tue, 21 Jun 2016 10:44:12 -0500 Subject: [PATCH 3/8] Test stderr fallback for TCP --- test/logger_logstash_backend/tcp_test.exs | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/test/logger_logstash_backend/tcp_test.exs b/test/logger_logstash_backend/tcp_test.exs index 82107d4..ea550d2 100644 --- a/test/logger_logstash_backend/tcp_test.exs +++ b/test/logger_logstash_backend/tcp_test.exs @@ -18,7 +18,9 @@ defmodule LoggerLogstashBackend.TCPTest do require Logger use Timex - @backend {LoggerLogstashBackend, :tcp_test} + import ExUnit.CaptureIO + + # Callbacks setup context = %{line: line} do # have to open socket before configure_backend, so that it is listening when connect happens @@ -66,7 +68,7 @@ defmodule LoggerLogstashBackend.TCPTest do assert fields["function"] == "test can log/1" assert fields["key1"] == "field1" assert fields["level"] == "info" - assert fields["line"] == 58 + assert fields["line"] == 60 assert fields["module"] == to_string(__MODULE__) assert fields["pid"] == inspect(self) assert fields["some_metadata"] == "go here" @@ -90,7 +92,7 @@ defmodule LoggerLogstashBackend.TCPTest do assert fields["function"] == "test can log pids/1" assert fields["pid_key"] == inspect(self) assert fields["level"] == "info" - assert fields["line"] == 82 + assert fields["line"] == 84 assert fields["module"] == to_string(__MODULE__) assert fields["pid"] == inspect(self) assert fields["some_metadata"] == "go here" @@ -113,6 +115,19 @@ defmodule LoggerLogstashBackend.TCPTest do assert {:ok, _} = :gen_tcp.accept(listen_socket, 1_000) end + test "if it can't reconnect, then is prints to :stderr", + %{accept_socket: accept_socket, listen_socket: listen_socket} do + :ok = :gen_tcp.close listen_socket + :ok = :gen_tcp.close accept_socket + + captured_stderr = capture_io :stderr, fn -> + Logger.info "Logged to stderr" + :timer.sleep 100 + end + + assert captured_stderr =~ "Logged to stderr" + end + defp get_log do receive do {:tcp, _socket, json} -> {:ok, json} From 6e6c896b750fe2462c6ebe4a9d58de7bcbebfaa2 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Tue, 21 Jun 2016 12:23:55 -0500 Subject: [PATCH 4/8] Reconnect on send for tcp If the listening socket is closed when :tcp_closed is received, then try to reconnect again on each send. --- lib/logger_logstash_backend.ex | 45 ++++++++++++++----- test/logger_logstash_backend/tcp_test.exs | 54 +++++++++++++++++++++-- 2 files changed, 84 insertions(+), 15 deletions(-) diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index 166e07f..6b7a385 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -63,8 +63,9 @@ defmodule LoggerLogstashBackend do end def handle_info({:tcp_closed, socket}, state = %__MODULE__{host: host, port: port, socket: socket}) do - with {:error, _} <- configure_socket(state, host, port) do - {:ok, state} + case configure_socket(state, host, port) do + {:error, _} -> {:ok, %{state | socket: nil}} + other -> other end end @@ -103,15 +104,7 @@ defmodule LoggerLogstashBackend do fields: fields } - with {:error, reason} <- Socket.send(state, [json, "\n"]) do - # fallback in case TCP configuration is bad - IO.puts :stderr, - "Could not log message (#{json}) to socket (#{url(state)}) due to #{inspect reason}. " <> - "Check that state (#{inspect state}) is correct." - end - - - {:ok, state} + send_with_retry(state, json) end defp configure(name, opts) when is_atom(name) and is_list(opts) do @@ -145,6 +138,11 @@ defmodule LoggerLogstashBackend do defp configure_socket(state, host, port) when is_nil(host) or is_nil(port), do: {:ok, state} defp configure_socket(state, host, port), do: configure_socket %{state | host: to_char_list(host), port: port} + defp fallback_log(state, json, reason) do + IO.puts :stderr, + "Could not log message (#{json}) to socket (#{url(state)}) due to #{inspect reason}. " <> + "Check that state (#{inspect state}) is correct." + end # inspects the argument only if it is a pid defp inspect_pid(pid) when is_pid(pid), do: inspect(pid) @@ -157,6 +155,31 @@ defmodule LoggerLogstashBackend do end end + defp send_with_retry(state = %__MODULE__{socket: nil}, json), do: send_with_new_socket(state, json) + defp send_with_retry(state, json) do + case Socket.send(state, [json, "\n"]) do + :ok -> + {:ok, state} + {:error, :closed} -> + send_with_new_socket(state, json) + {:error, reason} -> + fallback_log(state, json, reason) + + {:ok, state} + end + end + + defp send_with_new_socket(state, json) do + case configure_socket(state) do + {:error, reason} -> + fallback_log(state, json, reason) + + {:ok, state} + {:ok, new_state} -> + send_with_retry(new_state, json) + end + end + defp url(%__MODULE__{host: host, port: port, protocol: protocol}) do "#{protocol}://#{host}:#{port}" end diff --git a/test/logger_logstash_backend/tcp_test.exs b/test/logger_logstash_backend/tcp_test.exs index ea550d2..803c5f4 100644 --- a/test/logger_logstash_backend/tcp_test.exs +++ b/test/logger_logstash_backend/tcp_test.exs @@ -19,12 +19,21 @@ defmodule LoggerLogstashBackend.TCPTest do use Timex import ExUnit.CaptureIO + import ExUnit.CaptureLog + + # Functions + + def accept(listen_socket), do: :gen_tcp.accept(listen_socket, 1_000) + + def listen(port \\ 0) do + :gen_tcp.listen port, [:binary, {:active, true}, {:ip, {127, 0, 0, 1}}, {:reuseaddr, true}] + end # Callbacks setup context = %{line: line} do # have to open socket before configure_backend, so that it is listening when connect happens - {:ok, listen_socket} = :gen_tcp.listen 0, [:binary, {:active, true}, {:ip, {127, 0, 0, 1}}, {:reuseaddr, true}] + {:ok, listen_socket} = listen {:ok, port} = :inet.port(listen_socket) backend = {LoggerLogstashBackend, String.to_atom("#{inspect __MODULE__}#{line}")} @@ -41,7 +50,7 @@ defmodule LoggerLogstashBackend.TCPTest do protocol: :tcp ] - {:ok, accept_socket} = :gen_tcp.accept(listen_socket, 1_000) + {:ok, accept_socket} = accept(listen_socket) on_exit fn -> Logger.remove_backend backend @@ -52,6 +61,7 @@ defmodule LoggerLogstashBackend.TCPTest do full_context = context |> Map.put(:accept_socket, accept_socket) |> Map.put(:listen_socket, listen_socket) + |> Map.put(:port, port) {:ok, full_context} end @@ -68,7 +78,7 @@ defmodule LoggerLogstashBackend.TCPTest do assert fields["function"] == "test can log/1" assert fields["key1"] == "field1" assert fields["level"] == "info" - assert fields["line"] == 60 + assert fields["line"] == 70 assert fields["module"] == to_string(__MODULE__) assert fields["pid"] == inspect(self) assert fields["some_metadata"] == "go here" @@ -92,7 +102,7 @@ defmodule LoggerLogstashBackend.TCPTest do assert fields["function"] == "test can log pids/1" assert fields["pid_key"] == inspect(self) assert fields["level"] == "info" - assert fields["line"] == 84 + assert fields["line"] == 94 assert fields["module"] == to_string(__MODULE__) assert fields["pid"] == inspect(self) assert fields["some_metadata"] == "go here" @@ -115,6 +125,42 @@ defmodule LoggerLogstashBackend.TCPTest do assert {:ok, _} = :gen_tcp.accept(listen_socket, 1_000) end + test "it reconnects on send if disconnected and listening socket is closed " <> + "when handle_info({:tcp_closed, _}, _) is called", + %{accept_socket: accept_socket, listen_socket: listen_socket, port: port} do + :ok = :gen_tcp.close listen_socket + :ok = :gen_tcp.close accept_socket + + Logger.info "Can't connect" + :timer.sleep 100 + + {:ok, new_listen_socket} = listen(port) + + # Need to accept and wait in a separate process because connect won't come until the Logger.info call tries to + # establish a new socket + pid = self + spawn_link fn -> + {:ok, new_accept_socket} = accept(new_listen_socket) + + send pid, {:new_accept_socket, new_accept_socket} + + receive do + # forward to pid so that get_log! works as normal + message = {:tcp, _, _} -> send pid, message + end + end + + Logger.info "I reconnected" + + assert_receive {:new_accept_socket, _} + + :timer.sleep 100 + + {:ok, data} = JSX.decode get_log! + + assert data["message"] == "I reconnected" + end + test "if it can't reconnect, then is prints to :stderr", %{accept_socket: accept_socket, listen_socket: listen_socket} do :ok = :gen_tcp.close listen_socket From 08979b0684bf8dfe6090a004a0150fcdb9f1ab19 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Tue, 21 Jun 2016 13:17:37 -0500 Subject: [PATCH 5/8] Disable console logger to clean up mix test output --- config/config.exs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/config/config.exs b/config/config.exs index d2d855e..c4fdc09 100644 --- a/config/config.exs +++ b/config/config.exs @@ -1 +1,5 @@ use Mix.Config + +config :logger, + # disable console backend, so it does not clutter `mix test` output + backends: [] From 2c184face0a8fc29edca5659a0a452f0a2812137 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Tue, 21 Jun 2016 13:19:01 -0500 Subject: [PATCH 6/8] Remove unused import --- test/logger_logstash_backend/tcp_test.exs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/logger_logstash_backend/tcp_test.exs b/test/logger_logstash_backend/tcp_test.exs index 803c5f4..5bc57a7 100644 --- a/test/logger_logstash_backend/tcp_test.exs +++ b/test/logger_logstash_backend/tcp_test.exs @@ -19,7 +19,6 @@ defmodule LoggerLogstashBackend.TCPTest do use Timex import ExUnit.CaptureIO - import ExUnit.CaptureLog # Functions @@ -78,7 +77,7 @@ defmodule LoggerLogstashBackend.TCPTest do assert fields["function"] == "test can log/1" assert fields["key1"] == "field1" assert fields["level"] == "info" - assert fields["line"] == 70 + assert fields["line"] == 69 assert fields["module"] == to_string(__MODULE__) assert fields["pid"] == inspect(self) assert fields["some_metadata"] == "go here" @@ -102,7 +101,7 @@ defmodule LoggerLogstashBackend.TCPTest do assert fields["function"] == "test can log pids/1" assert fields["pid_key"] == inspect(self) assert fields["level"] == "info" - assert fields["line"] == 94 + assert fields["line"] == 93 assert fields["module"] == to_string(__MODULE__) assert fields["pid"] == inspect(self) assert fields["some_metadata"] == "go here" From 647daeafbf07ee73a8abc3891823d1dce89eedff Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Tue, 21 Jun 2016 13:43:34 -0500 Subject: [PATCH 7/8] Suppess stderr output not under test --- test/logger_logstash_backend/tcp_test.exs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/test/logger_logstash_backend/tcp_test.exs b/test/logger_logstash_backend/tcp_test.exs index 5bc57a7..cad7df8 100644 --- a/test/logger_logstash_backend/tcp_test.exs +++ b/test/logger_logstash_backend/tcp_test.exs @@ -130,8 +130,11 @@ defmodule LoggerLogstashBackend.TCPTest do :ok = :gen_tcp.close listen_socket :ok = :gen_tcp.close accept_socket - Logger.info "Can't connect" - :timer.sleep 100 + # suppress error messages not being tested + capture_io :stderr, fn -> + Logger.info "Can't connect" + :timer.sleep 100 + end {:ok, new_listen_socket} = listen(port) From fda5517f1232290efab2956743ba805e9c79cf94 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Wed, 20 Jul 2016 14:02:14 -0500 Subject: [PATCH 8/8] Support initial connection failing If the initial (TCP) connection to the logging server fails, use the same error recovery as with disconnect: log the message to stderr. --- lib/logger_logstash_backend.ex | 22 ++-- test/logger_logstash_backend/tcp_test.exs | 118 +++++++++++++++++----- 2 files changed, 109 insertions(+), 31 deletions(-) diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index 6b7a385..541223e 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -62,8 +62,8 @@ defmodule LoggerLogstashBackend do end end - def handle_info({:tcp_closed, socket}, state = %__MODULE__{host: host, port: port, socket: socket}) do - case configure_socket(state, host, port) do + def handle_info({:tcp_closed, socket}, state = %__MODULE__{socket: socket}) do + case configure_socket(state) do {:error, _} -> {:ok, %{state | socket: nil}} other -> other end @@ -120,12 +120,18 @@ defmodule LoggerLogstashBackend do protocol = Keyword.get opts, :protocol, :udp state = %__MODULE__{level: level, metadata: metadata, name: name, protocol: protocol, type: type} + |> put_destination(host, port) + + new_state = case configure_socket(state) do + {:error, _} -> state + {:ok, configured_state} -> configured_state + end - configure_socket(state, host, port) + {:ok, new_state} end - defp configure_socket(state = %__MODULE__{host: host, port: port}) do - case Socket.open %{state | host: to_char_list(host), port: port} do + defp configure_socket(state = %__MODULE__{}) do + case Socket.open state do reply = {:error, reason} -> IO.puts :stderr, "Could not open socket (#{url(state)}) due to reason #{inspect reason}" @@ -135,9 +141,6 @@ defmodule LoggerLogstashBackend do end end - defp configure_socket(state, host, port) when is_nil(host) or is_nil(port), do: {:ok, state} - defp configure_socket(state, host, port), do: configure_socket %{state | host: to_char_list(host), port: port} - defp fallback_log(state, json, reason) do IO.puts :stderr, "Could not log message (#{json}) to socket (#{url(state)}) due to #{inspect reason}. " <> @@ -155,6 +158,9 @@ defmodule LoggerLogstashBackend do end end + defp put_destination(state = %__MODULE__{}, host, port) when is_nil(host) or is_nil(port), do: state + defp put_destination(state = %__MODULE__{}, host, port), do: %{state | host: to_char_list(host), port: port} + defp send_with_retry(state = %__MODULE__{socket: nil}, json), do: send_with_new_socket(state, json) defp send_with_retry(state, json) do case Socket.send(state, [json, "\n"]) do diff --git a/test/logger_logstash_backend/tcp_test.exs b/test/logger_logstash_backend/tcp_test.exs index cad7df8..fd9a802 100644 --- a/test/logger_logstash_backend/tcp_test.exs +++ b/test/logger_logstash_backend/tcp_test.exs @@ -24,18 +24,47 @@ defmodule LoggerLogstashBackend.TCPTest do def accept(listen_socket), do: :gen_tcp.accept(listen_socket, 1_000) + def assert_logged_to_stderr(message) do + assert capture_logger_info(message) =~ message + end + + def capture_logger_info(message) do + capture_io :stderr, fn -> + Logger.info message + :timer.sleep 100 + end + end + + def capture_setup_backend(context) do + capture_io :stderr, fn -> + context + |> Map.put(:backend, true) + |> setup_backend + end + end + def listen(port \\ 0) do :gen_tcp.listen port, [:binary, {:active, true}, {:ip, {127, 0, 0, 1}}, {:reuseaddr, true}] end - # Callbacks + def setup_accept(context = %{accept: true}) do + %{listen_socket: listen_socket} = context - setup context = %{line: line} do - # have to open socket before configure_backend, so that it is listening when connect happens - {:ok, listen_socket} = listen - {:ok, port} = :inet.port(listen_socket) + {:ok, accept_socket} = accept(listen_socket) + + on_exit fn -> + :ok = :gen_tcp.close accept_socket + end - backend = {LoggerLogstashBackend, String.to_atom("#{inspect __MODULE__}#{line}")} + Map.put(context, :accept_socket, accept_socket) + end + def setup_accept(context), do: context + + def setup_backend(context = %{backend: true}) do + # don't include in function head so that backend true will always match even if forgot to populate :port + %{line: line, port: port} = context + + backend = {LoggerLogstashBackend, String.to_atom("#{inspect __MODULE__}:#{line}")} Logger.add_backend backend Logger.configure_backend backend, [ @@ -49,22 +78,43 @@ defmodule LoggerLogstashBackend.TCPTest do protocol: :tcp ] - {:ok, accept_socket} = accept(listen_socket) - on_exit fn -> Logger.remove_backend backend - :ok = :gen_tcp.close accept_socket + end + + context + end + def setup_backend(context), do: context + + def setup_listen(context = %{listen: true}) do + {:ok, listen_socket} = listen + {:ok, port} = :inet.port(listen_socket) + + on_exit fn -> :ok = :gen_tcp.close listen_socket end + context + |> Map.put(:listen_socket, listen_socket) + |> Map.put(:port, port) + end + def setup_listen(context), do: context + + # Callbacks + + setup context do full_context = context - |> Map.put(:accept_socket, accept_socket) - |> Map.put(:listen_socket, listen_socket) - |> Map.put(:port, port) + # have to open socket before configure_backend, so that it is listening when connect happens + |> setup_listen + |> setup_backend + |> setup_accept {:ok, full_context} end + @tag :accept + @tag :backend + @tag :listen test "can log" do Logger.info "hello world", [key1: "field1"] @@ -77,7 +127,7 @@ defmodule LoggerLogstashBackend.TCPTest do assert fields["function"] == "test can log/1" assert fields["key1"] == "field1" assert fields["level"] == "info" - assert fields["line"] == 69 + assert fields["line"] == 119 assert fields["module"] == to_string(__MODULE__) assert fields["pid"] == inspect(self) assert fields["some_metadata"] == "go here" @@ -89,6 +139,9 @@ defmodule LoggerLogstashBackend.TCPTest do assert (now - ts) < 1000 end + @tag :accept + @tag :backend + @tag :listen test "can log pids" do Logger.info "pid", [pid_key: self] @@ -101,7 +154,7 @@ defmodule LoggerLogstashBackend.TCPTest do assert fields["function"] == "test can log pids/1" assert fields["pid_key"] == inspect(self) assert fields["level"] == "info" - assert fields["line"] == 93 + assert fields["line"] == 146 assert fields["module"] == to_string(__MODULE__) assert fields["pid"] == inspect(self) assert fields["some_metadata"] == "go here" @@ -113,17 +166,26 @@ defmodule LoggerLogstashBackend.TCPTest do assert (now - ts) < 1000 end + @tag :accept + @tag :backend + @tag :listen test "cant log when minor levels" do Logger.debug "hello world", [key1: "field1"] {:error, :nothing_received} = get_log end + @tag :accept + @tag :backend + @tag :listen test "it reconnects if disconnected", %{accept_socket: accept_socket, listen_socket: listen_socket} do :ok = :gen_tcp.close accept_socket assert {:ok, _} = :gen_tcp.accept(listen_socket, 1_000) end + @tag :accept + @tag :backend + @tag :listen test "it reconnects on send if disconnected and listening socket is closed " <> "when handle_info({:tcp_closed, _}, _) is called", %{accept_socket: accept_socket, listen_socket: listen_socket, port: port} do @@ -131,10 +193,7 @@ defmodule LoggerLogstashBackend.TCPTest do :ok = :gen_tcp.close accept_socket # suppress error messages not being tested - capture_io :stderr, fn -> - Logger.info "Can't connect" - :timer.sleep 100 - end + capture_logger_info "Can't connect" {:ok, new_listen_socket} = listen(port) @@ -163,17 +222,30 @@ defmodule LoggerLogstashBackend.TCPTest do assert data["message"] == "I reconnected" end + @tag :accept + @tag :backend + @tag :listen test "if it can't reconnect, then is prints to :stderr", %{accept_socket: accept_socket, listen_socket: listen_socket} do :ok = :gen_tcp.close listen_socket :ok = :gen_tcp.close accept_socket - captured_stderr = capture_io :stderr, fn -> - Logger.info "Logged to stderr" - :timer.sleep 100 - end + assert_logged_to_stderr("Logged to stderr") + end + + @tag :listen + test "if it can't connect, then it prints to :stderr", context = %{listen_socket: listen_socket, port: port} do + :ok = :gen_tcp.close listen_socket + + assert capture_setup_backend(context) =~ "Could not open socket (tcp://127.0.0.1:#{port}) due to reason :econnrefused\n" + end + + @tag :listen + test "if it can't connect, then any log messages print to :stderr", context = %{listen_socket: listen_socket} do + :ok = :gen_tcp.close listen_socket + capture_setup_backend(context) - assert captured_stderr =~ "Logged to stderr" + assert_logged_to_stderr("Logged to stderr") end defp get_log do