11package cmd
22
33import (
4- "fmt"
54 "net/http"
65 "os"
76 "os/signal"
87 "syscall"
9- "time"
108
119 "github.com/pkg/errors"
1210 "github.com/prometheus/client_golang/prometheus/promhttp"
@@ -16,8 +14,6 @@ import (
1614 "github.com/brocaar/lora-gateway-bridge/internal/backend/mqtt"
1715 "github.com/brocaar/lora-gateway-bridge/internal/config"
1816 "github.com/brocaar/lora-gateway-bridge/internal/gateway/semtech"
19- "github.com/brocaar/lora-gateway-bridge/internal/legacy/backend/mqttpubsub"
20- "github.com/brocaar/lora-gateway-bridge/internal/legacy/gateway"
2117 "github.com/brocaar/loraserver/api/gw"
2218 "github.com/brocaar/lorawan"
2319)
@@ -57,93 +53,9 @@ func run(cmd *cobra.Command, args []string) error {
5753 }()
5854 }()
5955
60- if config .C .Backend .MQTT .Marshaler == "v2_json" {
61- if config .C .Backend .MQTT .Auth .Type != "generic" {
62- return fmt .Errorf ("auth type '%s' can not be used with 'v2_json' marshaler" , config .C .Backend .MQTT .Auth .Type )
63- }
64-
65- return runV2 (cmd , args )
66- }
6756 return runV3 (cmd , args )
6857}
6958
70- func runV2 (cmd * cobra.Command , args []string ) error {
71- var pubsub * mqttpubsub.Backend
72- for {
73- var err error
74- pubsub , err = mqttpubsub .NewBackend (config .C .Backend .MQTT )
75- if err == nil {
76- break
77- }
78-
79- log .Errorf ("could not setup mqtt backend, retry in 2 seconds: %s" , err )
80- time .Sleep (2 * time .Second )
81- }
82- defer pubsub .Close ()
83-
84- onNew := func (mac lorawan.EUI64 ) error {
85- return pubsub .SubscribeGatewayTopics (mac )
86- }
87-
88- onDelete := func (mac lorawan.EUI64 ) error {
89- return pubsub .UnSubscribeGatewayTopics (mac )
90- }
91-
92- gateway , err := gateway .NewBackend (config .C .PacketForwarder .UDPBind , onNew , onDelete , config .C .PacketForwarder .SkipCRCCheck , config .C .PacketForwarder .Configuration )
93- if err != nil {
94- log .Fatalf ("could not setup gateway backend: %s" , err )
95- }
96- defer gateway .Close ()
97-
98- go func () {
99- for rxPacket := range gateway .RXPacketChan () {
100- go func (rxPacket gw.RXPacketBytes ) {
101- if err := pubsub .PublishGatewayRX (rxPacket .RXInfo .MAC , rxPacket ); err != nil {
102- log .WithError (err ).Error ("publish uplink message error" )
103- }
104- }(rxPacket )
105- }
106- }()
107-
108- go func () {
109- for stats := range gateway .StatsChan () {
110- if err := pubsub .PublishGatewayStats (stats .MAC , stats ); err != nil {
111- log .WithError (err ).Error ("publish gateway stats message error" )
112- }
113- }
114- }()
115-
116- go func () {
117- for txPacket := range pubsub .TXPacketChan () {
118- if err := gateway .Send (txPacket ); err != nil {
119- log .WithError (err ).Error ("send downlink packet error" )
120- }
121- }
122- }()
123-
124- go func () {
125- for txAck := range gateway .TXAckChan () {
126- if err := pubsub .PublishGatewayTXAck (txAck .MAC , txAck ); err != nil {
127- log .WithError (err ).Error ("publish downlink ack error" )
128- }
129- }
130- }()
131-
132- go func () {
133- for configPacket := range pubsub .ConfigPacketChan () {
134- if err := gateway .ApplyConfiguration (configPacket ); err != nil {
135- log .WithError (err ).Error ("apply configuration error" )
136- }
137- }
138- }()
139-
140- sigChan := make (chan os.Signal )
141- signal .Notify (sigChan , os .Interrupt , syscall .SIGTERM )
142- log .WithField ("signal" , <- sigChan ).Info ("signal received" )
143- log .Warning ("shutting down server" )
144- return nil
145- }
146-
14759func runV3 (cmd * cobra.Command , args []string ) error {
14860 backend , err := mqtt .NewBackend (config .C .Backend .MQTT )
14961 if err != nil {
0 commit comments