From 996a77af790f25a271a4c2760b9ca47da6503121 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Wed, 18 Apr 2018 16:28:49 +0200 Subject: [PATCH 01/17] Send milliseconds in timestamp to Logstash. Remove dependency on `Timex` and `exjsx`. --- README.md | 8 +- lib/logger_logstash_backend.ex | 107 +++++++++++++++----------- mix.exs | 23 +++--- mix.lock | 7 +- test/logger_logstash_backend_test.exs | 62 ++++++++------- 5 files changed, 122 insertions(+), 85 deletions(-) diff --git a/README.md b/README.md index ec55fd8..8b71913 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,8 @@ that will send logs to the [Logstash UDP input](https://www.elastic.co/guide/en/ be merged with the metadata sent in every log message. * **level**: Atom. Minimum level for this backend. * **type**: String.t. Type of logs. Useful to filter in logstash. + * **timezone**: String.t. Server timezone. Used to convert from naive timestamp. Default `"Etc/UTC"`. + * **json_encoder**: Atom. Module to be used for JSON encoding. Default `Jason`. ## Sample Logstash config ``` @@ -51,7 +53,7 @@ Add logger and tzdata as applications: ```elixir def application do - [applications: [:logger, :timex]] + [applications: [:logger]] end ``` @@ -81,5 +83,7 @@ config :logger, :error_log, type: "my_type_of_app_or_node", metadata: [ extra_fields: "go here" - ] + ], + timezone: "Etc/UTC", + json_encoder: Jason ``` diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index 905adaa..ca357f5 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -15,7 +15,6 @@ ################################################################################ defmodule LoggerLogstashBackend do @behaviour :gen_event - use Timex def init({__MODULE__, name}) do {:ok, configure(name, [])} @@ -34,11 +33,13 @@ defmodule LoggerLogstashBackend do end def handle_event( - {level, _gl, {Logger, msg, ts, md}}, %{level: min_level} = state - ) do + {level, _gl, {Logger, msg, ts, md}}, + %{level: min_level} = state + ) do if is_nil(min_level) or Logger.compare_levels(level, min_level) != :lt do - log_event level, msg, ts, md, state + log_event(level, msg, ts, md, state) end + {:ok, state} end @@ -50,46 +51,62 @@ defmodule LoggerLogstashBackend do :ok end - defp log_event( - level, msg, ts, md, %{ - host: host, - port: port, - type: type, - metadata: metadata, - socket: socket - } - ) do - fields = md - |> Keyword.merge(metadata) - |> Enum.into(%{}) - |> Map.put(:level, to_string(level)) - |> inspect_pids - - {{year, month, day}, {hour, minute, second, milliseconds}} = ts - {:ok, ts} = NaiveDateTime.new( - year, month, day, hour, minute, second, (milliseconds * 1000) - ) - ts = Timex.to_datetime ts, Timezone.local - {:ok, json} = JSX.encode %{ - type: type, - "@timestamp": Timex.format!(ts, "{ISO:Extended}"), - message: to_string(msg), - fields: fields - } - :gen_udp.send socket, host, port, to_charlist(json) + defp log_event(level, msg, ts, md, %{ + host: host, + port: port, + type: type, + metadata: metadata, + socket: socket, + timezone: timezone, + json_encoder: json_encoder + }) do + fields = + md + |> Keyword.merge(metadata) + |> Enum.into(%{}) + |> Map.put(:level, to_string(level)) + |> inspect_pids + + {{year, month, day}, {hour, minute, second, microseconds}} = ts + + {:ok, ts} = + NaiveDateTime.new( + year, + month, + day, + hour, + minute, + second, + microseconds * 1000 + ) + + {:ok, datetime} = DateTime.from_naive(ts, timezone) + + {:ok, json} = + json_encoder.encode(%{ + type: type, + "@timestamp": DateTime.to_iso8601(datetime), + message: to_string(msg), + fields: fields + }) + + :gen_udp.send(socket, host, port, to_charlist(json)) end defp configure(name, opts) do - env = Application.get_env :logger, name, [] - opts = Keyword.merge env, opts - Application.put_env :logger, name, opts - - level = Keyword.get opts, :level, :debug - metadata = Keyword.get opts, :metadata, [] - type = Keyword.get opts, :type, "elixir" - host = Keyword.get opts, :host - port = Keyword.get opts, :port - {:ok, socket} = :gen_udp.open 0 + env = Application.get_env(:logger, name, []) + opts = Keyword.merge(env, opts) + Application.put_env(:logger, name, opts) + + level = Keyword.get(opts, :level, :debug) + metadata = Keyword.get(opts, :metadata, []) + type = Keyword.get(opts, :type, "elixir") + host = Keyword.get(opts, :host) + port = Keyword.get(opts, :port) + timezone = Keyword.get(opts, :timezone, "Etc/UTC") + json_encoder = Keyword.get(opts, :json_encoder, Jason) + {:ok, socket} = :gen_udp.open(0) + %{ name: name, host: to_charlist(host), @@ -97,7 +114,9 @@ defmodule LoggerLogstashBackend do level: level, socket: socket, type: type, - metadata: metadata + metadata: metadata, + timezone: timezone, + json_encoder: json_encoder } end @@ -107,8 +126,8 @@ defmodule LoggerLogstashBackend do # inspects the field values only if they are pids defp inspect_pids(fields) when is_map(fields) do - Enum.into fields, %{}, fn {key, value} -> + Enum.into(fields, %{}, fn {key, value} -> {key, inspect_pid(value)} - end + end) end end diff --git a/mix.exs b/mix.exs index 97ebf83..a5bd852 100644 --- a/mix.exs +++ b/mix.exs @@ -2,26 +2,27 @@ defmodule LoggerLogstashBackend.Mixfile do use Mix.Project def project do - [app: :logger_logstash_backend, - name: "logger_logstash_backend", - source_url: "https://github.com/marcelog/logger_logstash_backend", - version: "5.0.0", - elixir: "~> 1.3", - description: description(), - package: package(), - deps: deps()] + [ + app: :logger_logstash_backend, + name: "logger_logstash_backend", + source_url: "https://github.com/marcelog/logger_logstash_backend", + version: "5.0.0", + elixir: "~> 1.3", + description: description(), + package: package(), + deps: deps() + ] end def application do - [applications: [:logger, :timex, :exjsx]] + [applications: [:logger]] end defp deps do [ {:earmark, "~> 1.0.3", only: :dev}, {:ex_doc, "~> 0.14.5", only: :dev}, - {:exjsx, "~> 3.2.1"}, - {:timex, "~> 3.1.8"} + {:jason, "~> 1.0", only: [:dev, :test]} ] end diff --git a/mix.lock b/mix.lock index 7183b40..0794cbc 100644 --- a/mix.lock +++ b/mix.lock @@ -1,4 +1,5 @@ -%{"certifi": {:hex, :certifi, "0.7.0", "861a57f3808f7eb0c2d1802afeaae0fa5de813b0df0979153cbafcd853ababaf", [:rebar3], []}, +%{ + "certifi": {:hex, :certifi, "0.7.0", "861a57f3808f7eb0c2d1802afeaae0fa5de813b0df0979153cbafcd853ababaf", [:rebar3], []}, "combine": {:hex, :combine, "0.9.6", "8d1034a127d4cbf6924c8a5010d3534d958085575fa4d9b878f200d79ac78335", [:mix], []}, "earmark": {:hex, :earmark, "1.0.3", "89bdbaf2aca8bbb5c97d8b3b55c5dd0cff517ecc78d417e87f1d0982e514557b", [:mix], []}, "ex_doc": {:hex, :ex_doc, "0.14.5", "c0433c8117e948404d93ca69411dd575ec6be39b47802e81ca8d91017a0cf83c", [:mix], [{:earmark, "~> 1.0", [hex: :earmark, optional: false]}]}, @@ -6,9 +7,11 @@ "gettext": {:hex, :gettext, "0.13.0", "daafbddc5cda12738bb93b01d84105fe75b916a302f1c50ab9fb066b95ec9db4", [:mix], []}, "hackney": {:hex, :hackney, "1.6.5", "8c025ee397ac94a184b0743c73b33b96465e85f90a02e210e86df6cbafaa5065", [:rebar3], [{:certifi, "0.7.0", [hex: :certifi, optional: false]}, {:idna, "1.2.0", [hex: :idna, optional: false]}, {:metrics, "1.0.1", [hex: :metrics, optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, optional: false]}]}, "idna": {:hex, :idna, "1.2.0", "ac62ee99da068f43c50dc69acf700e03a62a348360126260e87f2b54eced86b2", [:rebar3], []}, + "jason": {:hex, :jason, "1.0.0", "0f7cfa9bdb23fed721ec05419bcee2b2c21a77e926bce0deda029b5adc716fe2", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"}, "jsx": {:hex, :jsx, "2.8.1", "1453b4eb3615acb3e2cd0a105d27e6761e2ed2e501ac0b390f5bbec497669846", [:mix, :rebar3], []}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], []}, "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], []}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], []}, "timex": {:hex, :timex, "3.1.8", "a32f636c4260dd7515a3767be3d3a163dc09d15f0e9e689254b7bab152b29209", [:mix], [{:combine, "~> 0.7", [hex: :combine, optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5", [hex: :tzdata, optional: false]}]}, - "tzdata": {:hex, :tzdata, "0.5.10", "087e8dfe8c0283473115ad8ca6974b898ecb55ca5c725427a142a79593391e90", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, optional: false]}]}} + "tzdata": {:hex, :tzdata, "0.5.10", "087e8dfe8c0283473115ad8ca6974b898ecb55ca5c725427a142a79593391e90", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, optional: false]}]}, +} diff --git a/test/logger_logstash_backend_test.exs b/test/logger_logstash_backend_test.exs index 20ea190..d1335b1 100644 --- a/test/logger_logstash_backend_test.exs +++ b/test/logger_logstash_backend_test.exs @@ -16,13 +16,13 @@ defmodule LoggerLogstashBackendTest do use ExUnit.Case, async: false require Logger - use Timex @backend {LoggerLogstashBackend, :test} - Logger.add_backend @backend + Logger.add_backend(@backend) setup do - Logger.configure_backend @backend, [ + Logger.configure_backend( + @backend, host: "127.0.0.1", port: 10001, level: :info, @@ -30,69 +30,79 @@ defmodule LoggerLogstashBackendTest do metadata: [ some_metadata: "go here" ] - ] - {:ok, socket} = :gen_udp.open 10001, [:binary, {:active, true}] - on_exit fn -> - :ok = :gen_udp.close socket - end + ) + + {:ok, socket} = :gen_udp.open(10001, [:binary, {:active, true}]) + + on_exit(fn -> + :ok = :gen_udp.close(socket) + end) + :ok end test "can log" do - Logger.info "hello world", [key1: "field1"] + Logger.info("hello world", key1: "field1") json = get_log() - {:ok, data} = JSX.decode json + {:ok, data} = Jason.decode(json) assert data["type"] === "some_app" assert data["message"] === "hello world" + expected = %{ "function" => "test can log/1", "level" => "info", "module" => "Elixir.LoggerLogstashBackendTest", - "pid" => (inspect self()), + "pid" => inspect(self()), "some_metadata" => "go here", - "line" => 42, + "line" => 45, "key1" => "field1" } + + assert data["fields"]["line"] == expected["line"] assert contains?(data["fields"], expected) - {:ok, ts} = Timex.parse data["@timestamp"], "{ISO:Extended}" - ts = Timex.to_unix ts + {:ok, dt, _tz_offset} = DateTime.from_iso8601(data["@timestamp"]) + ts = DateTime.to_unix(dt) - now = Timex.to_unix Timex.local - assert (now - ts) < 1000 + now = DateTime.utc_now() |> DateTime.to_unix() + assert now - ts < 1000 end test "can log pids" do - Logger.info "pid", [pid_key: self()] + Logger.info("pid", pid_key: self()) json = get_log() - {:ok, data} = JSX.decode json + {:ok, data} = Jason.decode(json) assert data["type"] === "some_app" assert data["message"] === "pid" + expected = %{ "function" => "test can log pids/1", "level" => "info", "module" => "Elixir.LoggerLogstashBackendTest", - "pid" => (inspect self()), + "pid" => inspect(self()), "pid_key" => inspect(self()), "some_metadata" => "go here", - "line" => 65 + "line" => 71 } + + assert data["fields"]["line"] == expected["line"] assert contains?(data["fields"], expected) - {:ok, ts} = Timex.parse data["@timestamp"], "{ISO:Extended}" - ts = Timex.to_unix ts + {:ok, dt, _tz_offset} = DateTime.from_iso8601(data["@timestamp"]) + ts = DateTime.to_unix(dt) - now = Timex.to_unix Timex.local - assert (now - ts) < 1000 + now = DateTime.utc_now() |> DateTime.to_unix() + assert now - ts < 1000 end test "cant log when minor levels" do - Logger.debug "hello world", [key1: "field1"] + Logger.debug("hello world", key1: "field1") :nothing_received = get_log() end defp get_log do receive do {:udp, _, _, _, json} -> json - after 500 -> :nothing_received + after + 500 -> :nothing_received end end From fa2138806f7e34a79621edf8f77c61a7afd53f9b Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Thu, 19 Apr 2018 11:31:12 +0200 Subject: [PATCH 02/17] Update TravisCI build targets --- .travis.yml | 2 -- mix.exs | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index a8a0f95..8d14536 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,6 @@ language: elixir elixir: - - 1.3.0 - 1.4.0 - 1.5.0 otp_release: - 18.0 - diff --git a/mix.exs b/mix.exs index a5bd852..b30b221 100644 --- a/mix.exs +++ b/mix.exs @@ -7,7 +7,7 @@ defmodule LoggerLogstashBackend.Mixfile do name: "logger_logstash_backend", source_url: "https://github.com/marcelog/logger_logstash_backend", version: "5.0.0", - elixir: "~> 1.3", + elixir: "~> 1.4", description: description(), package: package(), deps: deps() From 29ead759f293bddfbc4059bd949c686e0f792ed1 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Mon, 11 Jun 2018 11:50:34 +0200 Subject: [PATCH 03/17] Clean up deps & add note to install `Jason` to use it. --- README.md | 2 +- mix.exs | 2 +- mix.lock | 12 ------------ 3 files changed, 2 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 8b71913..825225c 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ that will send logs to the [Logstash UDP input](https://www.elastic.co/guide/en/ * **level**: Atom. Minimum level for this backend. * **type**: String.t. Type of logs. Useful to filter in logstash. * **timezone**: String.t. Server timezone. Used to convert from naive timestamp. Default `"Etc/UTC"`. - * **json_encoder**: Atom. Module to be used for JSON encoding. Default `Jason`. + * **json_encoder**: Atom. Module to be used for JSON encoding. The default is `Jason`, add it to your mix deps to use. ## Sample Logstash config ``` diff --git a/mix.exs b/mix.exs index b30b221..7657755 100644 --- a/mix.exs +++ b/mix.exs @@ -22,7 +22,7 @@ defmodule LoggerLogstashBackend.Mixfile do [ {:earmark, "~> 1.0.3", only: :dev}, {:ex_doc, "~> 0.14.5", only: :dev}, - {:jason, "~> 1.0", only: [:dev, :test]} + {:jason, "~> 1.0", only: [:test]} ] end diff --git a/mix.lock b/mix.lock index 0794cbc..afe91c3 100644 --- a/mix.lock +++ b/mix.lock @@ -1,17 +1,5 @@ %{ - "certifi": {:hex, :certifi, "0.7.0", "861a57f3808f7eb0c2d1802afeaae0fa5de813b0df0979153cbafcd853ababaf", [:rebar3], []}, - "combine": {:hex, :combine, "0.9.6", "8d1034a127d4cbf6924c8a5010d3534d958085575fa4d9b878f200d79ac78335", [:mix], []}, "earmark": {:hex, :earmark, "1.0.3", "89bdbaf2aca8bbb5c97d8b3b55c5dd0cff517ecc78d417e87f1d0982e514557b", [:mix], []}, "ex_doc": {:hex, :ex_doc, "0.14.5", "c0433c8117e948404d93ca69411dd575ec6be39b47802e81ca8d91017a0cf83c", [:mix], [{:earmark, "~> 1.0", [hex: :earmark, optional: false]}]}, - "exjsx": {:hex, :exjsx, "3.2.1", "1bc5bf1e4fd249104178f0885030bcd75a4526f4d2a1e976f4b428d347614f0f", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, optional: false]}]}, - "gettext": {:hex, :gettext, "0.13.0", "daafbddc5cda12738bb93b01d84105fe75b916a302f1c50ab9fb066b95ec9db4", [:mix], []}, - "hackney": {:hex, :hackney, "1.6.5", "8c025ee397ac94a184b0743c73b33b96465e85f90a02e210e86df6cbafaa5065", [:rebar3], [{:certifi, "0.7.0", [hex: :certifi, optional: false]}, {:idna, "1.2.0", [hex: :idna, optional: false]}, {:metrics, "1.0.1", [hex: :metrics, optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, optional: false]}]}, - "idna": {:hex, :idna, "1.2.0", "ac62ee99da068f43c50dc69acf700e03a62a348360126260e87f2b54eced86b2", [:rebar3], []}, "jason": {:hex, :jason, "1.0.0", "0f7cfa9bdb23fed721ec05419bcee2b2c21a77e926bce0deda029b5adc716fe2", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"}, - "jsx": {:hex, :jsx, "2.8.1", "1453b4eb3615acb3e2cd0a105d27e6761e2ed2e501ac0b390f5bbec497669846", [:mix, :rebar3], []}, - "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], []}, - "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], []}, - "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], []}, - "timex": {:hex, :timex, "3.1.8", "a32f636c4260dd7515a3767be3d3a163dc09d15f0e9e689254b7bab152b29209", [:mix], [{:combine, "~> 0.7", [hex: :combine, optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5", [hex: :tzdata, optional: false]}]}, - "tzdata": {:hex, :tzdata, "0.5.10", "087e8dfe8c0283473115ad8ca6974b898ecb55ca5c725427a142a79593391e90", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, optional: false]}]}, } From 31ba9ea1d992515bbb58dc04d1e73cb4dd8e8496 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Wed, 27 Jun 2018 13:14:47 +0200 Subject: [PATCH 04/17] Allow configuring protocol and add :tcp as option --- lib/logger_logstash_backend.ex | 45 ++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index ca357f5..873346e 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -51,15 +51,13 @@ defmodule LoggerLogstashBackend do :ok end - defp log_event(level, msg, ts, md, %{ - host: host, - port: port, - type: type, - metadata: metadata, - socket: socket, - timezone: timezone, - json_encoder: json_encoder - }) do + defp log_event( + level, + msg, + ts, + md, + %{type: type, metadata: metadata, timezone: timezone, json_encoder: json_encoder} = state + ) do fields = md |> Keyword.merge(metadata) @@ -90,7 +88,15 @@ defmodule LoggerLogstashBackend do fields: fields }) - :gen_udp.send(socket, host, port, to_charlist(json)) + send_log(state, json) + end + + defp send_log(%{protocol: :udp, socket: socket, host: host, port: port} = state, json) do + :gen_udp.send(socket, host, port, [json, "\n"]) + end + + defp send_log(%{protocol: :tcp, socket: socket} = state, json) do + :gen_tcp.send(socket, [json, "\n"]) end defp configure(name, opts) do @@ -105,19 +111,32 @@ defmodule LoggerLogstashBackend do port = Keyword.get(opts, :port) timezone = Keyword.get(opts, :timezone, "Etc/UTC") json_encoder = Keyword.get(opts, :json_encoder, Jason) - {:ok, socket} = :gen_udp.open(0) + protocol = Keyword.get(opts, :protocol, :udp) %{ name: name, host: to_charlist(host), port: port, level: level, - socket: socket, type: type, metadata: metadata, timezone: timezone, - json_encoder: json_encoder + json_encoder: json_encoder, + protocol: protocol } + |> open_socket() + end + + defp open_socket(%{protocol: :udp} = state) do + {:ok, socket} = :gen_udp.open(0) + Map.put(state, :socket, socket) + end + + defp open_socket(%{protocol: :tcp} = state) do + {:ok, socket} = + :gen_tcp.connect(state.host, state.port, [{:active, true}, :binary, {:keepalive, true}]) + + Map.put(state, :socket, socket) end # inspects the argument only if it is a pid From dcc5cbd4ff8de12acb551538c3300f5347265578 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Wed, 27 Jun 2018 13:38:31 +0200 Subject: [PATCH 05/17] Add support for TLS connections --- lib/logger_logstash_backend.ex | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index 873346e..5694f18 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -95,6 +95,10 @@ defmodule LoggerLogstashBackend do :gen_udp.send(socket, host, port, [json, "\n"]) end + defp send_log(%{protocol: :tcp, ssl: true, socket: socket} = state, json) do + :ssl.send(socket, [json, "\n"]) + end + defp send_log(%{protocol: :tcp, socket: socket} = state, json) do :gen_tcp.send(socket, [json, "\n"]) end @@ -112,6 +116,11 @@ defmodule LoggerLogstashBackend do timezone = Keyword.get(opts, :timezone, "Etc/UTC") json_encoder = Keyword.get(opts, :json_encoder, Jason) protocol = Keyword.get(opts, :protocol, :udp) + ssl = Keyword.get(opts, :ssl, false) + + if ssl && protocol == :udp do + raise ArgumentError, message: "cannot use SSL in combination with UDP. Use TCP instead" + end %{ name: name, @@ -122,7 +131,8 @@ defmodule LoggerLogstashBackend do metadata: metadata, timezone: timezone, json_encoder: json_encoder, - protocol: protocol + protocol: protocol, + ssl: ssl } |> open_socket() end @@ -132,6 +142,14 @@ defmodule LoggerLogstashBackend do Map.put(state, :socket, socket) end + defp open_socket(%{protocol: :tcp, ssl: true} = state) do + {:ok, socket} = + :gen_tcp.connect(state.host, state.port, [{:active, true}, :binary, {:keepalive, true}]) + + socket = :ssl.connect(socket, fail_if_no_peer_cert: true) + Map.put(state, :socket, socket) + end + defp open_socket(%{protocol: :tcp} = state) do {:ok, socket} = :gen_tcp.connect(state.host, state.port, [{:active, true}, :binary, {:keepalive, true}]) From 72495791cbf0dffd1e84536a8bdd32e5b5274cce Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Wed, 27 Jun 2018 13:40:27 +0200 Subject: [PATCH 06/17] Remove unused variables --- lib/logger_logstash_backend.ex | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index 5694f18..cc64031 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -91,15 +91,15 @@ defmodule LoggerLogstashBackend do send_log(state, json) end - defp send_log(%{protocol: :udp, socket: socket, host: host, port: port} = state, json) do + defp send_log(%{protocol: :udp, socket: socket, host: host, port: port}, json) do :gen_udp.send(socket, host, port, [json, "\n"]) end - defp send_log(%{protocol: :tcp, ssl: true, socket: socket} = state, json) do + defp send_log(%{protocol: :tcp, ssl: true, socket: socket}, json) do :ssl.send(socket, [json, "\n"]) end - defp send_log(%{protocol: :tcp, socket: socket} = state, json) do + defp send_log(%{protocol: :tcp, socket: socket}, json) do :gen_tcp.send(socket, [json, "\n"]) end From 4ade7c557e3d1c483d7b9d09b7b57f052a34cb22 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Wed, 27 Jun 2018 13:42:16 +0200 Subject: [PATCH 07/17] Pattern match output of :ssl.connect/2 --- lib/logger_logstash_backend.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index cc64031..fc5366f 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -146,7 +146,7 @@ defmodule LoggerLogstashBackend do {:ok, socket} = :gen_tcp.connect(state.host, state.port, [{:active, true}, :binary, {:keepalive, true}]) - socket = :ssl.connect(socket, fail_if_no_peer_cert: true) + {:ok, socket} = :ssl.connect(socket, fail_if_no_peer_cert: true) Map.put(state, :socket, socket) end From 801101504f19c107f417e9b2598ae257d1bd7d35 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Wed, 27 Jun 2018 13:48:17 +0200 Subject: [PATCH 08/17] Match on output of `send` functions --- lib/logger_logstash_backend.ex | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index fc5366f..ee60b43 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -92,15 +92,15 @@ defmodule LoggerLogstashBackend do end defp send_log(%{protocol: :udp, socket: socket, host: host, port: port}, json) do - :gen_udp.send(socket, host, port, [json, "\n"]) + :ok = :gen_udp.send(socket, host, port, [json, "\n"]) end defp send_log(%{protocol: :tcp, ssl: true, socket: socket}, json) do - :ssl.send(socket, [json, "\n"]) + :ok = :ssl.send(socket, [json, "\n"]) end defp send_log(%{protocol: :tcp, socket: socket}, json) do - :gen_tcp.send(socket, [json, "\n"]) + :ok = :gen_tcp.send(socket, [json, "\n"]) end defp configure(name, opts) do From ea1d9e454902ae538e8079d1f81c636b59a4d2d6 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Wed, 27 Jun 2018 13:56:08 +0200 Subject: [PATCH 09/17] Add fields to top level --- lib/logger_logstash_backend.ex | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index ee60b43..62f9863 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -80,13 +80,17 @@ defmodule LoggerLogstashBackend do {:ok, datetime} = DateTime.from_naive(ts, timezone) - {:ok, json} = - json_encoder.encode(%{ - type: type, - "@timestamp": DateTime.to_iso8601(datetime), - message: to_string(msg), - fields: fields - }) + message = + Map.merge( + %{ + type: type, + "@timestamp": DateTime.to_iso8601(datetime), + message: to_string(msg) + }, + fields + ) + + {:ok, json} = json_encoder.encode(message) send_log(state, json) end From 54bec9563407c1ca77f19b0647402527228d2c02 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Wed, 27 Jun 2018 15:28:29 +0200 Subject: [PATCH 10/17] Update tests to reflect fields being merged directly into json --- test/logger_logstash_backend_test.exs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/logger_logstash_backend_test.exs b/test/logger_logstash_backend_test.exs index d1335b1..6dcac91 100644 --- a/test/logger_logstash_backend_test.exs +++ b/test/logger_logstash_backend_test.exs @@ -58,8 +58,8 @@ defmodule LoggerLogstashBackendTest do "key1" => "field1" } - assert data["fields"]["line"] == expected["line"] - assert contains?(data["fields"], expected) + assert data["line"] == expected["line"] + assert contains?(data, expected) {:ok, dt, _tz_offset} = DateTime.from_iso8601(data["@timestamp"]) ts = DateTime.to_unix(dt) @@ -84,8 +84,8 @@ defmodule LoggerLogstashBackendTest do "line" => 71 } - assert data["fields"]["line"] == expected["line"] - assert contains?(data["fields"], expected) + assert data["line"] == expected["line"] + assert contains?(data, expected) {:ok, dt, _tz_offset} = DateTime.from_iso8601(data["@timestamp"]) ts = DateTime.to_unix(dt) From 4f9f1b720ba6dcfd21b9d4c85d798cd7a9847b5f Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Fri, 20 Jul 2018 09:18:23 +0200 Subject: [PATCH 11/17] Handle unavailable connections by attempting reconnect on every log --- lib/logger_logstash_backend.ex | 70 +++++++++++++++++++++++++--------- 1 file changed, 53 insertions(+), 17 deletions(-) diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index 62f9863..1ab7786 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -16,11 +16,14 @@ defmodule LoggerLogstashBackend do @behaviour :gen_event + require Logger + def init({__MODULE__, name}) do {:ok, configure(name, [])} end - def handle_call({:configure, opts}, %{name: name}) do + def handle_call({:configure, opts}, %{name: name} = state) do + close_socket(state) {:ok, :ok, configure(name, opts)} end @@ -36,11 +39,13 @@ defmodule LoggerLogstashBackend do {level, _gl, {Logger, msg, ts, md}}, %{level: min_level} = state ) do + new_state = open_socket(state) + if is_nil(min_level) or Logger.compare_levels(level, min_level) != :lt do - log_event(level, msg, ts, md, state) + log_event(level, msg, ts, md, new_state) end - {:ok, state} + {:ok, new_state} end def code_change(_old_vsn, state, _extra) do @@ -95,6 +100,8 @@ defmodule LoggerLogstashBackend do send_log(state, json) end + defp send_log(%{socket: nil}, _json), do: nil + defp send_log(%{protocol: :udp, socket: socket, host: host, port: port}, json) do :ok = :gen_udp.send(socket, host, port, [json, "\n"]) end @@ -136,31 +143,60 @@ defmodule LoggerLogstashBackend do timezone: timezone, json_encoder: json_encoder, protocol: protocol, - ssl: ssl + ssl: ssl, + socket: nil } - |> open_socket() end - defp open_socket(%{protocol: :udp} = state) do - {:ok, socket} = :gen_udp.open(0) - Map.put(state, :socket, socket) + defp close_socket(%{socket: nil} = state), do: state + + defp close_socket(%{protocol: :udp} = state) do + :ok = :gen_udp.close(state.socket) + %{state | socket: nil} + end + + defp close_socket(%{protocol: :tcp} = state) do + :gen_tcp.shutdown(state.socket, :write) + %{state | socket: nil} end - defp open_socket(%{protocol: :tcp, ssl: true} = state) do - {:ok, socket} = - :gen_tcp.connect(state.host, state.port, [{:active, true}, :binary, {:keepalive, true}]) + defp open_socket(%{protocol: :udp, socket: nil} = state) do + case :gen_udp.open(0) do + {:ok, socket} -> + Map.put(state, :socket, socket) - {:ok, socket} = :ssl.connect(socket, fail_if_no_peer_cert: true) - Map.put(state, :socket, socket) + {:error, reason} -> + Logger.info("could not connect to logstash via udp, reason: #{inspect(reason)}") + state + end + end + + defp open_socket(%{protocol: :tcp, ssl: true, socket: nil} = state) do + with {:ok, socket} <- + :gen_tcp.connect(state.host, state.port, [{:active, true}, :binary, {:keepalive, true}]), + {:ok, socket} <- :ssl.connect(socket, fail_if_no_peer_cert: true) do + Map.put(state, :socket, socket) + else + {:error, reason} -> + Logger.info("could not connect via tcp (with ssl), reason: #{inspect(reason)}") + state + end end - defp open_socket(%{protocol: :tcp} = state) do - {:ok, socket} = - :gen_tcp.connect(state.host, state.port, [{:active, true}, :binary, {:keepalive, true}]) + defp open_socket(%{protocol: :tcp, socket: nil} = state) do + :gen_tcp.connect(state.host, state.port, [{:active, true}, :binary, {:keepalive, true}]) + |> case do + {:ok, socket} -> + Map.put(state, :socket, socket) - Map.put(state, :socket, socket) + {:error, reason} -> + Logger.info("could not connect via tcp, reason: #{inspect(reason)}") + state + end end + defp open_socket(state), do: state + # inspects the argument only if it is a pid defp inspect_pid(pid) when is_pid(pid), do: inspect(pid) defp inspect_pid(other), do: other From 42fa0022ba84e0b6224060d4fbfc1393abee42e6 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Fri, 20 Jul 2018 09:34:00 +0200 Subject: [PATCH 12/17] Only log error once --- lib/logger_logstash_backend.ex | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index 1ab7786..280c34a 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -144,7 +144,8 @@ defmodule LoggerLogstashBackend do json_encoder: json_encoder, protocol: protocol, ssl: ssl, - socket: nil + socket: nil, + recorded_error: false } end @@ -160,14 +161,20 @@ defmodule LoggerLogstashBackend do %{state | socket: nil} end + defp log_error(error, %{recorded_error: false} = state) do + Logger.info( + "could not connect to logstash via #{inspect(state.protocol)}, reason: #{inspect(error)}" + ) + + %{state | recorded_error: true} + end + + defp log_error(_error, state), do: state + defp open_socket(%{protocol: :udp, socket: nil} = state) do case :gen_udp.open(0) do - {:ok, socket} -> - Map.put(state, :socket, socket) - - {:error, reason} -> - Logger.info("could not connect to logstash via udp, reason: #{inspect(reason)}") - state + {:ok, socket} -> Map.put(state, :socket, socket) + {:error, reason} -> log_error(reason, state) end end @@ -177,21 +184,15 @@ defmodule LoggerLogstashBackend do {:ok, socket} <- :ssl.connect(socket, fail_if_no_peer_cert: true) do Map.put(state, :socket, socket) else - {:error, reason} -> - Logger.info("could not connect via tcp (with ssl), reason: #{inspect(reason)}") - state + {:error, reason} -> log_error(reason, state) end end defp open_socket(%{protocol: :tcp, socket: nil} = state) do :gen_tcp.connect(state.host, state.port, [{:active, true}, :binary, {:keepalive, true}]) |> case do - {:ok, socket} -> - Map.put(state, :socket, socket) - - {:error, reason} -> - Logger.info("could not connect via tcp, reason: #{inspect(reason)}") - state + {:ok, socket} -> Map.put(state, :socket, socket) + {:error, reason} -> log_error(reason, state) end end From bde392e0994bb39098ac328ad58049db3eb45e2f Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Fri, 20 Jul 2018 09:36:43 +0200 Subject: [PATCH 13/17] Log as error --- lib/logger_logstash_backend.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index 280c34a..7861c85 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -162,7 +162,7 @@ defmodule LoggerLogstashBackend do end defp log_error(error, %{recorded_error: false} = state) do - Logger.info( + Logger.error( "could not connect to logstash via #{inspect(state.protocol)}, reason: #{inspect(error)}" ) From 8f87e86fd917ce382472406e4b0ad62a2692bbbf Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Fri, 20 Jul 2018 10:03:28 +0200 Subject: [PATCH 14/17] Reset recorded_error when connection has been made --- lib/logger_logstash_backend.ex | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index 7861c85..0619266 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -173,7 +173,7 @@ defmodule LoggerLogstashBackend do defp open_socket(%{protocol: :udp, socket: nil} = state) do case :gen_udp.open(0) do - {:ok, socket} -> Map.put(state, :socket, socket) + {:ok, socket} -> Map.merge(state, %{socket: socket, recorded_error: false}) {:error, reason} -> log_error(reason, state) end end @@ -182,7 +182,7 @@ defmodule LoggerLogstashBackend do with {:ok, socket} <- :gen_tcp.connect(state.host, state.port, [{:active, true}, :binary, {:keepalive, true}]), {:ok, socket} <- :ssl.connect(socket, fail_if_no_peer_cert: true) do - Map.put(state, :socket, socket) + Map.merge(state, %{socket: socket, recorded_error: false}) else {:error, reason} -> log_error(reason, state) end @@ -191,7 +191,7 @@ defmodule LoggerLogstashBackend do defp open_socket(%{protocol: :tcp, socket: nil} = state) do :gen_tcp.connect(state.host, state.port, [{:active, true}, :binary, {:keepalive, true}]) |> case do - {:ok, socket} -> Map.put(state, :socket, socket) + {:ok, socket} -> Map.merge(state, %{socket: socket, recorded_error: false}) {:error, reason} -> log_error(reason, state) end end From 2c0d0faa7b88fa39d4def3cbca75ba7dd6c04994 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Wed, 29 Aug 2018 13:20:42 +0200 Subject: [PATCH 15/17] Set a connection timeout of 10s and a send timeout of 10s. The default value for connection timeout is infinity, in my case it was waiting for an hour before giving up, which is an awfully long time. --- lib/logger_logstash_backend.ex | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index 0619266..c45702d 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -189,7 +189,12 @@ defmodule LoggerLogstashBackend do end defp open_socket(%{protocol: :tcp, socket: nil} = state) do - :gen_tcp.connect(state.host, state.port, [{:active, true}, :binary, {:keepalive, true}]) + :gen_tcp.connect( + state.host, + state.port, + [{:active, true}, :binary, {:keepalive, true}, {:send_timeout, 10_000}], + 10_000 + ) |> case do {:ok, socket} -> Map.merge(state, %{socket: socket, recorded_error: false}) {:error, reason} -> log_error(reason, state) From 0ef890dd84907b3e34a59fa235e74b416fef9fc4 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Tue, 16 Oct 2018 15:22:08 +0200 Subject: [PATCH 16/17] Handle closed connections (log error and mark socket as closed) --- lib/logger_logstash_backend.ex | 44 ++++++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index c45702d..bc49df0 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -41,9 +41,12 @@ defmodule LoggerLogstashBackend do ) do new_state = open_socket(state) - if is_nil(min_level) or Logger.compare_levels(level, min_level) != :lt do - log_event(level, msg, ts, md, new_state) - end + new_state = + if is_nil(min_level) or Logger.compare_levels(level, min_level) != :lt do + log_event(level, msg, ts, md, new_state) + else + new_state + end {:ok, new_state} end @@ -100,18 +103,39 @@ defmodule LoggerLogstashBackend do send_log(state, json) end - defp send_log(%{socket: nil}, _json), do: nil + defp send_log(%{socket: nil} = state, _json), do: state + + defp send_log(%{protocol: :udp, socket: socket, host: host, port: port} = state, json) do + case :gen_udp.send(socket, host, port, [json, "\n"]) do + :ok -> + state - defp send_log(%{protocol: :udp, socket: socket, host: host, port: port}, json) do - :ok = :gen_udp.send(socket, host, port, [json, "\n"]) + {:error, :closed} -> + log_error(:closed, state) + %{state | socket: nil} + end end - defp send_log(%{protocol: :tcp, ssl: true, socket: socket}, json) do - :ok = :ssl.send(socket, [json, "\n"]) + defp send_log(%{protocol: :tcp, ssl: true, socket: socket} = state, json) do + case :ssl.send(socket, [json, "\n"]) do + :ok -> + state + + {:error, :closed} -> + log_error(:closed, state) + %{state | socket: nil} + end end - defp send_log(%{protocol: :tcp, socket: socket}, json) do - :ok = :gen_tcp.send(socket, [json, "\n"]) + defp send_log(%{protocol: :tcp, socket: socket} = state, json) do + case :gen_tcp.send(socket, [json, "\n"]) do + :ok -> + state + + {:error, :closed} -> + log_error(:closed, state) + %{state | socket: nil} + end end defp configure(name, opts) do From 1fbe5796de8f9d9d3aaef87c0fe0d98daea46b76 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Fri, 23 Nov 2018 13:21:05 +0100 Subject: [PATCH 17/17] Specify timeout for :ssl.connect (and reduce other timeouts) --- lib/logger_logstash_backend.ex | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/lib/logger_logstash_backend.ex b/lib/logger_logstash_backend.ex index bc49df0..9fe8d0b 100644 --- a/lib/logger_logstash_backend.ex +++ b/lib/logger_logstash_backend.ex @@ -204,8 +204,14 @@ defmodule LoggerLogstashBackend do defp open_socket(%{protocol: :tcp, ssl: true, socket: nil} = state) do with {:ok, socket} <- - :gen_tcp.connect(state.host, state.port, [{:active, true}, :binary, {:keepalive, true}]), - {:ok, socket} <- :ssl.connect(socket, fail_if_no_peer_cert: true) do + :gen_tcp.connect(state.host, state.port, [ + {:active, true}, + :binary, + {:keepalive, true}, + {:send_timeout, 5000}, + {:send_timeout_close, true} + ]), + {:ok, socket} <- :ssl.connect(socket, [fail_if_no_peer_cert: true], 5000) do Map.merge(state, %{socket: socket, recorded_error: false}) else {:error, reason} -> log_error(reason, state) @@ -216,8 +222,14 @@ defmodule LoggerLogstashBackend do :gen_tcp.connect( state.host, state.port, - [{:active, true}, :binary, {:keepalive, true}, {:send_timeout, 10_000}], - 10_000 + [ + {:active, true}, + :binary, + {:keepalive, true}, + {:send_timeout, 5000}, + {:send_timeout_close, true} + ], + 5000 ) |> case do {:ok, socket} -> Map.merge(state, %{socket: socket, recorded_error: false})