Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
use Mix.Config

config :logger,
# disable console backend, so it does not clutter `mix test` output
backends: []
136 changes: 117 additions & 19 deletions lib/logger_logstash_backend.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,80 @@
# limitations under the License.
################################################################################
defmodule LoggerLogstashBackend do
alias LoggerLogstashBackend.Socket

use GenEvent
use Timex

# Struct

defstruct ~w(
host
level
metadata
name
port
protocol
socket
type
)a

# Functions

## GenEvent callbacks

def init({__MODULE__, name}) do
{:ok, configure(name, [])}
# trap exits, so that cleanup can occur in terminate/2
Process.flag(:trap_exit, true)

configure(name, [])
end

def handle_call({:configure, opts}, %{name: name}) do
{:ok, :ok, configure(name, opts)}
case configure(name, opts) do
{:ok, state} ->
{:ok, :ok, state}
reply = {:error, _reason} ->
{:remove_handler, reply}
end
end

def handle_event(
{level, _gl, {Logger, msg, ts, md}}, %{level: min_level} = state
) do
if is_nil(min_level) or Logger.compare_levels(level, min_level) != :lt do
log_event level, msg, ts, md, state
else
{:ok, state}
end
end

def handle_info({:tcp_closed, socket}, state = %__MODULE__{socket: socket}) do
case configure_socket(state) do
{:error, _} -> {:ok, %{state | socket: nil}}
other -> other
end
end

def handle_info(_, state) do
{:ok, state}
end

@doc """
Closes socket when the backend is removed
"""
def terminate(_, state) do
:ok = Socket.close(state)

state
end

## Private Functions

defp log_event(
level, msg, ts, md, %{
host: host,
port: port,
level, msg, ts, md, state = %{
type: type,
metadata: metadata,
socket: socket
metadata: metadata
}
) do
fields = md
Expand All @@ -56,10 +103,11 @@ defmodule LoggerLogstashBackend do
message: to_string(msg),
fields: fields
}
:gen_udp.send socket, host, port, to_char_list(json)

send_with_retry(state, json)
end

defp configure(name, opts) do
defp configure(name, opts) when is_atom(name) and is_list(opts) do
env = Application.get_env :logger, name, []
opts = Keyword.merge env, opts
Application.put_env :logger, name, opts
Expand All @@ -69,16 +117,34 @@ defmodule LoggerLogstashBackend do
type = Keyword.get opts, :type, "elixir"
host = Keyword.get opts, :host
port = Keyword.get opts, :port
{:ok, socket} = :gen_udp.open 0
%{
name: name,
host: to_char_list(host),
port: port,
level: level,
socket: socket,
type: type,
metadata: metadata
}
protocol = Keyword.get opts, :protocol, :udp

state = %__MODULE__{level: level, metadata: metadata, name: name, protocol: protocol, type: type}
|> put_destination(host, port)

new_state = case configure_socket(state) do
{:error, _} -> state
{:ok, configured_state} -> configured_state
end

{:ok, new_state}
end

defp configure_socket(state = %__MODULE__{}) do
case Socket.open state do
reply = {:error, reason} ->
IO.puts :stderr, "Could not open socket (#{url(state)}) due to reason #{inspect reason}"

reply
other ->
other
end
end

defp fallback_log(state, json, reason) do
IO.puts :stderr,
"Could not log message (#{json}) to socket (#{url(state)}) due to #{inspect reason}. " <>
"Check that state (#{inspect state}) is correct."
end

# inspects the argument only if it is a pid
Expand All @@ -91,4 +157,36 @@ defmodule LoggerLogstashBackend do
{key, inspect_pid(value)}
end
end

defp put_destination(state = %__MODULE__{}, host, port) when is_nil(host) or is_nil(port), do: state
defp put_destination(state = %__MODULE__{}, host, port), do: %{state | host: to_char_list(host), port: port}

defp send_with_retry(state = %__MODULE__{socket: nil}, json), do: send_with_new_socket(state, json)
defp send_with_retry(state, json) do
case Socket.send(state, [json, "\n"]) do
:ok ->
{:ok, state}
{:error, :closed} ->
send_with_new_socket(state, json)
{:error, reason} ->
fallback_log(state, json, reason)

{:ok, state}
end
end

defp send_with_new_socket(state, json) do
case configure_socket(state) do
{:error, reason} ->
fallback_log(state, json, reason)

{:ok, state}
{:ok, new_state} ->
send_with_retry(new_state, json)
end
end

defp url(%__MODULE__{host: host, port: port, protocol: protocol}) do
"#{protocol}://#{host}:#{port}"
end
end
50 changes: 50 additions & 0 deletions lib/logger_logstash_backend/socket.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
defmodule LoggerLogstashBackend.Socket do
@moduledoc """
Hides differences between `:udp` and `:tcp` sockets, so that protocol can be chosen in configuration
"""

# Types

@type protocol :: :tcp | :udp

# Functions

@doc """
Closes the `protocol` `socket`
"""
@spec close(map) :: :ok

def close(%{socket: socket}), do: :inet.close(socket)

@doc """
Opens a `protocol` socket using [`:gen_tcp.connect/3`](http://erlang.org/doc/man/gen_tcp.html#connect-3) or
[`:gen_udp.open/1`](http://erlang.org/doc/man/gen_udp.html#open-1).
"""
@spec open(map) :: {:ok, map} | {:error, :inet.posix}

def open(map = %{host: host, port: port, protocol: :tcp}) do
host
|> :gen_tcp.connect(port, [{:active, true}, :binary, {:keepalive, true}])
|> put_socket(map)
end

def open(map = %{protocol: :udp}) do
0
|> :gen_udp.open
|> put_socket(map)
end

@doc """
Sends a `protocol` message over `socket`.
"""
@spec send(map, iodata) :: :ok | {:error, :closed | :not_owner | :inet.posix}
def send(%{protocol: :tcp, socket: socket}, packet), do: :gen_tcp.send(socket, packet)
def send(%{host: host, port: port, protocol: :udp, socket: socket}, packet) do
:gen_udp.send(socket, host, port, packet)
end

## Private Functions

defp put_socket({:ok, socket}, map), do: {:ok, Map.put(map, :socket, socket)}
defp put_socket(other, _), do: other
end
Loading