diff --git a/config/config.exs b/config/config.exs index d2d855e..c4fdc09 100644 --- a/config/config.exs +++ b/config/config.exs @@ -1 +1,5 @@ use Mix.Config + +config :logger, + # disable console backend, so it does not clutter `mix test` output + backends: [] diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index 2c4106f..541223e 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -14,15 +14,42 @@ # limitations under the License. ################################################################################ defmodule LoggerLogstashBackend do + alias LoggerLogstashBackend.Socket + use GenEvent use Timex + # Struct + + defstruct ~w( + host + level + metadata + name + port + protocol + socket + type + )a + + # Functions + + ## GenEvent callbacks + def init({__MODULE__, name}) do - {:ok, configure(name, [])} + # trap exits, so that cleanup can occur in terminate/2 + Process.flag(:trap_exit, true) + + configure(name, []) end def handle_call({:configure, opts}, %{name: name}) do - {:ok, :ok, configure(name, opts)} + case configure(name, opts) do + {:ok, state} -> + {:ok, :ok, state} + reply = {:error, _reason} -> + {:remove_handler, reply} + end end def handle_event( @@ -30,17 +57,37 @@ defmodule LoggerLogstashBackend do ) do if is_nil(min_level) or Logger.compare_levels(level, min_level) != :lt do log_event level, msg, ts, md, state + else + {:ok, state} end + end + + def handle_info({:tcp_closed, socket}, state = %__MODULE__{socket: socket}) do + case configure_socket(state) do + {:error, _} -> {:ok, %{state | socket: nil}} + other -> other + end + end + + def handle_info(_, state) do {:ok, state} end + @doc """ + Closes socket when the backend is removed + """ + def terminate(_, state) do + :ok = Socket.close(state) + + state + end + + ## Private Functions + defp log_event( - level, msg, ts, md, %{ - host: host, - port: port, + level, msg, ts, md, state = %{ type: type, - metadata: metadata, - socket: socket + metadata: metadata } ) do fields = md @@ -56,10 +103,11 @@ defmodule LoggerLogstashBackend do message: to_string(msg), fields: fields } - :gen_udp.send socket, host, port, to_char_list(json) + + send_with_retry(state, json) end - defp configure(name, opts) do + defp configure(name, opts) when is_atom(name) and is_list(opts) do env = Application.get_env :logger, name, [] opts = Keyword.merge env, opts Application.put_env :logger, name, opts @@ -69,16 +117,34 @@ defmodule LoggerLogstashBackend do type = Keyword.get opts, :type, "elixir" host = Keyword.get opts, :host port = Keyword.get opts, :port - {:ok, socket} = :gen_udp.open 0 - %{ - name: name, - host: to_char_list(host), - port: port, - level: level, - socket: socket, - type: type, - metadata: metadata - } + protocol = Keyword.get opts, :protocol, :udp + + state = %__MODULE__{level: level, metadata: metadata, name: name, protocol: protocol, type: type} + |> put_destination(host, port) + + new_state = case configure_socket(state) do + {:error, _} -> state + {:ok, configured_state} -> configured_state + end + + {:ok, new_state} + end + + defp configure_socket(state = %__MODULE__{}) do + case Socket.open state do + reply = {:error, reason} -> + IO.puts :stderr, "Could not open socket (#{url(state)}) due to reason #{inspect reason}" + + reply + other -> + other + end + end + + defp fallback_log(state, json, reason) do + IO.puts :stderr, + "Could not log message (#{json}) to socket (#{url(state)}) due to #{inspect reason}. " <> + "Check that state (#{inspect state}) is correct." end # inspects the argument only if it is a pid @@ -91,4 +157,36 @@ defmodule LoggerLogstashBackend do {key, inspect_pid(value)} end end + + defp put_destination(state = %__MODULE__{}, host, port) when is_nil(host) or is_nil(port), do: state + defp put_destination(state = %__MODULE__{}, host, port), do: %{state | host: to_char_list(host), port: port} + + defp send_with_retry(state = %__MODULE__{socket: nil}, json), do: send_with_new_socket(state, json) + defp send_with_retry(state, json) do + case Socket.send(state, [json, "\n"]) do + :ok -> + {:ok, state} + {:error, :closed} -> + send_with_new_socket(state, json) + {:error, reason} -> + fallback_log(state, json, reason) + + {:ok, state} + end + end + + defp send_with_new_socket(state, json) do + case configure_socket(state) do + {:error, reason} -> + fallback_log(state, json, reason) + + {:ok, state} + {:ok, new_state} -> + send_with_retry(new_state, json) + end + end + + defp url(%__MODULE__{host: host, port: port, protocol: protocol}) do + "#{protocol}://#{host}:#{port}" + end end diff --git a/lib/logger_logstash_backend/socket.ex b/lib/logger_logstash_backend/socket.ex new file mode 100644 index 0000000..4230030 --- /dev/null +++ b/lib/logger_logstash_backend/socket.ex @@ -0,0 +1,50 @@ +defmodule LoggerLogstashBackend.Socket do + @moduledoc """ + Hides differences between `:udp` and `:tcp` sockets, so that protocol can be chosen in configuration + """ + + # Types + + @type protocol :: :tcp | :udp + + # Functions + + @doc """ + Closes the `protocol` `socket` + """ + @spec close(map) :: :ok + + def close(%{socket: socket}), do: :inet.close(socket) + + @doc """ + Opens a `protocol` socket using [`:gen_tcp.connect/3`](http://erlang.org/doc/man/gen_tcp.html#connect-3) or + [`:gen_udp.open/1`](http://erlang.org/doc/man/gen_udp.html#open-1). + """ + @spec open(map) :: {:ok, map} | {:error, :inet.posix} + + def open(map = %{host: host, port: port, protocol: :tcp}) do + host + |> :gen_tcp.connect(port, [{:active, true}, :binary, {:keepalive, true}]) + |> put_socket(map) + end + + def open(map = %{protocol: :udp}) do + 0 + |> :gen_udp.open + |> put_socket(map) + end + + @doc """ + Sends a `protocol` message over `socket`. + """ + @spec send(map, iodata) :: :ok | {:error, :closed | :not_owner | :inet.posix} + def send(%{protocol: :tcp, socket: socket}, packet), do: :gen_tcp.send(socket, packet) + def send(%{host: host, port: port, protocol: :udp, socket: socket}, packet) do + :gen_udp.send(socket, host, port, packet) + end + + ## Private Functions + + defp put_socket({:ok, socket}, map), do: {:ok, Map.put(map, :socket, socket)} + defp put_socket(other, _), do: other +end diff --git a/test/logger_logstash_backend/tcp_test.exs b/test/logger_logstash_backend/tcp_test.exs new file mode 100644 index 0000000..fd9a802 --- /dev/null +++ b/test/logger_logstash_backend/tcp_test.exs @@ -0,0 +1,263 @@ +################################################################################ +# Copyright 2015 Marcelo Gornstein +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +defmodule LoggerLogstashBackend.TCPTest do + use ExUnit.Case, async: false + require Logger + use Timex + + import ExUnit.CaptureIO + + # Functions + + def accept(listen_socket), do: :gen_tcp.accept(listen_socket, 1_000) + + def assert_logged_to_stderr(message) do + assert capture_logger_info(message) =~ message + end + + def capture_logger_info(message) do + capture_io :stderr, fn -> + Logger.info message + :timer.sleep 100 + end + end + + def capture_setup_backend(context) do + capture_io :stderr, fn -> + context + |> Map.put(:backend, true) + |> setup_backend + end + end + + def listen(port \\ 0) do + :gen_tcp.listen port, [:binary, {:active, true}, {:ip, {127, 0, 0, 1}}, {:reuseaddr, true}] + end + + def setup_accept(context = %{accept: true}) do + %{listen_socket: listen_socket} = context + + {:ok, accept_socket} = accept(listen_socket) + + on_exit fn -> + :ok = :gen_tcp.close accept_socket + end + + Map.put(context, :accept_socket, accept_socket) + end + def setup_accept(context), do: context + + def setup_backend(context = %{backend: true}) do + # don't include in function head so that backend true will always match even if forgot to populate :port + %{line: line, port: port} = context + + backend = {LoggerLogstashBackend, String.to_atom("#{inspect __MODULE__}:#{line}")} + + Logger.add_backend backend + Logger.configure_backend backend, [ + host: "127.0.0.1", + port: port, + level: :info, + type: "some_app", + metadata: [ + some_metadata: "go here" + ], + protocol: :tcp + ] + + on_exit fn -> + Logger.remove_backend backend + end + + context + end + def setup_backend(context), do: context + + def setup_listen(context = %{listen: true}) do + {:ok, listen_socket} = listen + {:ok, port} = :inet.port(listen_socket) + + on_exit fn -> + :ok = :gen_tcp.close listen_socket + end + + context + |> Map.put(:listen_socket, listen_socket) + |> Map.put(:port, port) + end + def setup_listen(context), do: context + + # Callbacks + + setup context do + full_context = context + # have to open socket before configure_backend, so that it is listening when connect happens + |> setup_listen + |> setup_backend + |> setup_accept + + {:ok, full_context} + end + + @tag :accept + @tag :backend + @tag :listen + test "can log" do + Logger.info "hello world", [key1: "field1"] + + assert {:ok, data} = JSX.decode get_log! + assert data["type"] === "some_app" + assert data["message"] === "hello world" + + fields = data["fields"] + + assert fields["function"] == "test can log/1" + assert fields["key1"] == "field1" + assert fields["level"] == "info" + assert fields["line"] == 119 + assert fields["module"] == to_string(__MODULE__) + assert fields["pid"] == inspect(self) + assert fields["some_metadata"] == "go here" + + {:ok, ts} = Timex.parse data["@timestamp"], "%FT%T%z", :strftime + ts = Timex.to_unix ts + + now = Timex.to_unix Timex.DateTime.local + assert (now - ts) < 1000 + end + + @tag :accept + @tag :backend + @tag :listen + test "can log pids" do + Logger.info "pid", [pid_key: self] + + {:ok, data} = JSX.decode get_log! + assert data["type"] === "some_app" + assert data["message"] === "pid" + + fields = data["fields"] + + assert fields["function"] == "test can log pids/1" + assert fields["pid_key"] == inspect(self) + assert fields["level"] == "info" + assert fields["line"] == 146 + assert fields["module"] == to_string(__MODULE__) + assert fields["pid"] == inspect(self) + assert fields["some_metadata"] == "go here" + + {:ok, ts} = Timex.parse data["@timestamp"], "%FT%T%z", :strftime + ts = Timex.to_unix ts + + now = Timex.to_unix Timex.DateTime.local + assert (now - ts) < 1000 + end + + @tag :accept + @tag :backend + @tag :listen + test "cant log when minor levels" do + Logger.debug "hello world", [key1: "field1"] + {:error, :nothing_received} = get_log + end + + @tag :accept + @tag :backend + @tag :listen + test "it reconnects if disconnected", %{accept_socket: accept_socket, listen_socket: listen_socket} do + :ok = :gen_tcp.close accept_socket + + assert {:ok, _} = :gen_tcp.accept(listen_socket, 1_000) + end + + @tag :accept + @tag :backend + @tag :listen + test "it reconnects on send if disconnected and listening socket is closed " <> + "when handle_info({:tcp_closed, _}, _) is called", + %{accept_socket: accept_socket, listen_socket: listen_socket, port: port} do + :ok = :gen_tcp.close listen_socket + :ok = :gen_tcp.close accept_socket + + # suppress error messages not being tested + capture_logger_info "Can't connect" + + {:ok, new_listen_socket} = listen(port) + + # Need to accept and wait in a separate process because connect won't come until the Logger.info call tries to + # establish a new socket + pid = self + spawn_link fn -> + {:ok, new_accept_socket} = accept(new_listen_socket) + + send pid, {:new_accept_socket, new_accept_socket} + + receive do + # forward to pid so that get_log! works as normal + message = {:tcp, _, _} -> send pid, message + end + end + + Logger.info "I reconnected" + + assert_receive {:new_accept_socket, _} + + :timer.sleep 100 + + {:ok, data} = JSX.decode get_log! + + assert data["message"] == "I reconnected" + end + + @tag :accept + @tag :backend + @tag :listen + test "if it can't reconnect, then is prints to :stderr", + %{accept_socket: accept_socket, listen_socket: listen_socket} do + :ok = :gen_tcp.close listen_socket + :ok = :gen_tcp.close accept_socket + + assert_logged_to_stderr("Logged to stderr") + end + + @tag :listen + test "if it can't connect, then it prints to :stderr", context = %{listen_socket: listen_socket, port: port} do + :ok = :gen_tcp.close listen_socket + + assert capture_setup_backend(context) =~ "Could not open socket (tcp://127.0.0.1:#{port}) due to reason :econnrefused\n" + end + + @tag :listen + test "if it can't connect, then any log messages print to :stderr", context = %{listen_socket: listen_socket} do + :ok = :gen_tcp.close listen_socket + capture_setup_backend(context) + + assert_logged_to_stderr("Logged to stderr") + end + + defp get_log do + receive do + {:tcp, _socket, json} -> {:ok, json} + after 500 -> {:error, :nothing_received} + end + end + + defp get_log! do + {:ok, log} = get_log + + log + end +end diff --git a/test/logger_logstash_backend_test.exs b/test/logger_logstash_backend/udp_test.exs similarity index 67% rename from test/logger_logstash_backend_test.exs rename to test/logger_logstash_backend/udp_test.exs index c40e60a..ba72089 100644 --- a/test/logger_logstash_backend_test.exs +++ b/test/logger_logstash_backend/udp_test.exs @@ -13,15 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -defmodule LoggerLogstashBackendTest do +defmodule LoggerLogstashBackend.UDPTest do use ExUnit.Case, async: false require Logger use Timex - @backend {LoggerLogstashBackend, :test} - Logger.add_backend @backend + @backend {LoggerLogstashBackend, :udp_test} setup do + Logger.add_backend @backend Logger.configure_backend @backend, [ host: "127.0.0.1", port: 10001, @@ -32,28 +32,32 @@ defmodule LoggerLogstashBackendTest do ] ] {:ok, socket} = :gen_udp.open 10001, [:binary, {:active, true}] + on_exit fn -> + Logger.remove_backend @backend :ok = :gen_udp.close socket end + :ok end test "can log" do Logger.info "hello world", [key1: "field1"] - json = get_log - {:ok, data} = JSX.decode json + + assert {:ok, data} = JSX.decode get_log assert data["type"] === "some_app" assert data["message"] === "hello world" - expected = %{ - "function" => "test can log/1", - "level" => "info", - "module" => "Elixir.LoggerLogstashBackendTest", - "pid" => (inspect self), - "some_metadata" => "go here", - "line" => 42, - "key1" => "field1" - } - assert contains?(data["fields"], expected) + + fields = data["fields"] + + assert fields["function"] == "test can log/1" + assert fields["key1"] == "field1" + assert fields["level"] == "info" + assert fields["line"] == 45 + assert fields["module"] == to_string(__MODULE__) + assert fields["pid"] == inspect(self) + assert fields["some_metadata"] == "go here" + {:ok, ts} = Timex.parse data["@timestamp"], "%FT%T%z", :strftime ts = Timex.to_unix ts @@ -63,20 +67,21 @@ defmodule LoggerLogstashBackendTest do test "can log pids" do Logger.info "pid", [pid_key: self] - json = get_log - {:ok, data} = JSX.decode json + + {:ok, data} = JSX.decode get_log assert data["type"] === "some_app" assert data["message"] === "pid" - expected = %{ - "function" => "test can log pids/1", - "level" => "info", - "module" => "Elixir.LoggerLogstashBackendTest", - "pid" => (inspect self), - "pid_key" => inspect(self), - "some_metadata" => "go here", - "line" => 65 - } - assert contains?(data["fields"], expected) + + fields = data["fields"] + + assert fields["function"] == "test can log pids/1" + assert fields["pid_key"] == inspect(self) + assert fields["level"] == "info" + assert fields["line"] == 69 + assert fields["module"] == to_string(__MODULE__) + assert fields["pid"] == inspect(self) + assert fields["some_metadata"] == "go here" + {:ok, ts} = Timex.parse data["@timestamp"], "%FT%T%z", :strftime ts = Timex.to_unix ts @@ -91,14 +96,8 @@ defmodule LoggerLogstashBackendTest do defp get_log do receive do - {:udp, _, _, _, json} -> json + {:udp, _socket, _from_ip, _from_port, json} -> json after 500 -> :nothing_received end end - - defp contains?(map1, map2) do - Enum.all?(Map.to_list(map2), fn {key, value} -> - Map.fetch!(map1, key) == value - end) - end end