Skip to content

Commit cbd9303

Browse files
committed
Allow tcp or udp sockets
1 parent b473cde commit cbd9303

File tree

4 files changed

+275
-54
lines changed

4 files changed

+275
-54
lines changed

lib/logger_logstash_backend.ex

Lines changed: 74 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,33 +14,69 @@
1414
# limitations under the License.
1515
################################################################################
1616
defmodule LoggerLogstashBackend do
17+
alias LoggerLogstashBackend.Socket
18+
1719
use GenEvent
1820
use Timex
1921

22+
# Struct
23+
24+
defstruct ~w(
25+
host
26+
level
27+
metadata
28+
name
29+
port
30+
protocol
31+
socket
32+
type
33+
)a
34+
35+
# Functions
36+
37+
## GenEvent callbacks
38+
2039
def init({__MODULE__, name}) do
21-
{:ok, configure(name, [])}
40+
# trap exits, so that cleanup can occur in terminate/2
41+
Process.flag(:trap_exit, true)
42+
43+
configure(name, [])
2244
end
2345

2446
def handle_call({:configure, opts}, %{name: name}) do
25-
{:ok, :ok, configure(name, opts)}
47+
case configure(name, opts) do
48+
{:ok, state} ->
49+
{:ok, :ok, state}
50+
reply = {:error, _reason} ->
51+
{:remove_handler, reply}
52+
end
2653
end
2754

2855
def handle_event(
2956
{level, _gl, {Logger, msg, ts, md}}, %{level: min_level} = state
3057
) do
3158
if is_nil(min_level) or Logger.compare_levels(level, min_level) != :lt do
3259
log_event level, msg, ts, md, state
60+
else
61+
{:ok, state}
3362
end
34-
{:ok, state}
3563
end
3664

65+
@doc """
66+
Closes socket when the backend is removed
67+
"""
68+
def terminate(_, state) do
69+
:ok = Socket.close(state)
70+
71+
state
72+
end
73+
74+
## Private Functions
75+
3776
defp log_event(
38-
level, msg, ts, md, %{
39-
host: host,
40-
port: port,
77+
level, msg, ts, md, state = %{
4178
type: type,
42-
metadata: metadata,
43-
socket: socket
79+
metadata: metadata
4480
}
4581
) do
4682
fields = md
@@ -56,10 +92,19 @@ defmodule LoggerLogstashBackend do
5692
message: to_string(msg),
5793
fields: fields
5894
}
59-
:gen_udp.send socket, host, port, to_char_list(json)
95+
96+
with {:error, reason} <- Socket.send(state, [json, "\n"]) do
97+
# fallback in case TCP configuration is bad
98+
IO.puts :stderr,
99+
"Could not log message (#{json}) to socket (#{url(state)}) due to #{inspect reason}. " <>
100+
"Check that state (#{inspect state}) is correct."
101+
end
102+
103+
104+
{:ok, state}
60105
end
61106

62-
defp configure(name, opts) do
107+
defp configure(name, opts) when is_atom(name) and is_list(opts) do
63108
env = Application.get_env :logger, name, []
64109
opts = Keyword.merge env, opts
65110
Application.put_env :logger, name, opts
@@ -69,18 +114,23 @@ defmodule LoggerLogstashBackend do
69114
type = Keyword.get opts, :type, "elixir"
70115
host = Keyword.get opts, :host
71116
port = Keyword.get opts, :port
72-
{:ok, socket} = :gen_udp.open 0
73-
%{
74-
name: name,
75-
host: to_char_list(host),
76-
port: port,
77-
level: level,
78-
socket: socket,
79-
type: type,
80-
metadata: metadata
81-
}
117+
protocol = Keyword.get opts, :protocol, :udp
118+
119+
state = %__MODULE__{level: level, metadata: metadata, name: name, protocol: protocol, type: type}
120+
121+
configure_socket(state, host, port)
82122
end
83123

124+
defp configure_socket(state, host, port) when is_nil(host) or is_nil(port), do: {:ok, state}
125+
defp configure_socket(state, host, port) do
126+
with reply = {:error, reason} <- Socket.open %{state | host: to_char_list(host), port: port} do
127+
IO.puts :stderr, "Could not open socket (#{url(state)}) due to reason #{inspect reason}"
128+
129+
reply
130+
end
131+
end
132+
133+
84134
# inspects the argument only if it is a pid
85135
defp inspect_pid(pid) when is_pid(pid), do: inspect(pid)
86136
defp inspect_pid(other), do: other
@@ -91,4 +141,8 @@ defmodule LoggerLogstashBackend do
91141
{key, inspect_pid(value)}
92142
end
93143
end
144+
145+
defp url(%__MODULE__{host: host, port: port, protocol: protocol}) do
146+
"#{protocol}://#{host}:#{port}"
147+
end
94148
end
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
defmodule LoggerLogstashBackend.Socket do
2+
@moduledoc """
3+
Hides differences between `:udp` and `:tcp` sockets, so that protocol can be chosen in configuration
4+
"""
5+
6+
# Types
7+
8+
@type protocol :: :tcp | :udp
9+
10+
# Functions
11+
12+
@doc """
13+
Closes the `protocol` `socket`
14+
"""
15+
@spec close(map) :: :ok
16+
17+
def close(%{socket: socket}), do: :inet.close(socket)
18+
19+
@doc """
20+
Opens a `protocol` socket using [`:gen_tcp.connect/3`](http://erlang.org/doc/man/gen_tcp.html#connect-3) or
21+
[`:gen_udp.open/1`](http://erlang.org/doc/man/gen_udp.html#open-1).
22+
"""
23+
@spec open(map) :: {:ok, map} | {:error, :inet.posix}
24+
25+
def open(map = %{host: host, port: port, protocol: :tcp}) do
26+
host
27+
|> :gen_tcp.connect(port, [{:active, true}, :binary, {:keepalive, true}])
28+
|> put_socket(map)
29+
end
30+
31+
def open(map = %{protocol: :udp}) do
32+
0
33+
|> :gen_udp.open
34+
|> put_socket(map)
35+
end
36+
37+
@doc """
38+
Sends a `protocol` message over `socket`.
39+
"""
40+
@spec send(map, iodata) :: :ok | {:error, :closed | :not_owner | :inet.posix}
41+
def send(%{protocol: :tcp, socket: socket}, packet), do: :gen_tcp.send(socket, packet)
42+
def send(%{host: host, port: port, protocol: :udp, socket: socket}, packet) do
43+
:gen_udp.send(socket, host, port, packet)
44+
end
45+
46+
## Private Functions
47+
48+
defp put_socket({:ok, socket}, map), do: {:ok, Map.put(map, :socket, socket)}
49+
defp put_socket(other, _), do: other
50+
end
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
################################################################################
2+
# Copyright 2015 Marcelo Gornstein <marcelog@gmail.com>
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
################################################################################
16+
defmodule LoggerLogstashBackend.TCPTest do
17+
use ExUnit.Case, async: false
18+
require Logger
19+
use Timex
20+
21+
@backend {LoggerLogstashBackend, :tcp_test}
22+
23+
setup context = %{line: line} do
24+
# have to open socket before configure_backend, so that it is listening when connect happens
25+
{:ok, listen_socket} = :gen_tcp.listen 0, [:binary, {:active, true}, {:ip, {127, 0, 0, 1}}, {:reuseaddr, true}]
26+
{:ok, port} = :inet.port(listen_socket)
27+
28+
backend = {LoggerLogstashBackend, String.to_atom("#{inspect __MODULE__}#{line}")}
29+
30+
Logger.add_backend backend
31+
Logger.configure_backend backend, [
32+
host: "127.0.0.1",
33+
port: port,
34+
level: :info,
35+
type: "some_app",
36+
metadata: [
37+
some_metadata: "go here"
38+
],
39+
protocol: :tcp
40+
]
41+
42+
{:ok, accept_socket} = :gen_tcp.accept(listen_socket, 1_000)
43+
44+
on_exit fn ->
45+
Logger.remove_backend backend
46+
:ok = :gen_tcp.close accept_socket
47+
:ok = :gen_tcp.close listen_socket
48+
end
49+
50+
{:ok, context}
51+
end
52+
53+
test "can log" do
54+
Logger.info "hello world", [key1: "field1"]
55+
56+
assert {:ok, data} = JSX.decode get_log!
57+
assert data["type"] === "some_app"
58+
assert data["message"] === "hello world"
59+
60+
fields = data["fields"]
61+
62+
assert fields["function"] == "test can log/1"
63+
assert fields["key1"] == "field1"
64+
assert fields["level"] == "info"
65+
assert fields["line"] == 54
66+
assert fields["module"] == to_string(__MODULE__)
67+
assert fields["pid"] == inspect(self)
68+
assert fields["some_metadata"] == "go here"
69+
70+
{:ok, ts} = Timex.parse data["@timestamp"], "%FT%T%z", :strftime
71+
ts = Timex.to_unix ts
72+
73+
now = Timex.to_unix Timex.DateTime.local
74+
assert (now - ts) < 1000
75+
end
76+
77+
test "can log pids" do
78+
Logger.info "pid", [pid_key: self]
79+
80+
{:ok, data} = JSX.decode get_log!
81+
assert data["type"] === "some_app"
82+
assert data["message"] === "pid"
83+
84+
fields = data["fields"]
85+
86+
assert fields["function"] == "test can log pids/1"
87+
assert fields["pid_key"] == inspect(self)
88+
assert fields["level"] == "info"
89+
assert fields["line"] == 78
90+
assert fields["module"] == to_string(__MODULE__)
91+
assert fields["pid"] == inspect(self)
92+
assert fields["some_metadata"] == "go here"
93+
94+
{:ok, ts} = Timex.parse data["@timestamp"], "%FT%T%z", :strftime
95+
ts = Timex.to_unix ts
96+
97+
now = Timex.to_unix Timex.DateTime.local
98+
assert (now - ts) < 1000
99+
end
100+
101+
test "cant log when minor levels" do
102+
Logger.debug "hello world", [key1: "field1"]
103+
{:error, :nothing_received} = get_log
104+
end
105+
106+
defp get_log do
107+
receive do
108+
{:tcp, _socket, json} -> {:ok, json}
109+
after 500 -> {:error, :nothing_received}
110+
end
111+
end
112+
113+
defp get_log! do
114+
{:ok, log} = get_log
115+
116+
log
117+
end
118+
end

0 commit comments

Comments
 (0)