Skip to content

Commit 6f41a8c

Browse files
authored
Merge pull request #574 from GilGil1/master
Custom open connection for different network types. This allows the user to provide a custom function which will be used to open the network connection enabling support for edge cases not supported by the inbuilt function.
2 parents aa9a7a1 + b7ff17c commit 6f41a8c

File tree

4 files changed

+125
-1
lines changed

4 files changed

+125
-1
lines changed

client.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,11 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
393393
tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig)
394394
}
395395
// Start by opening the network connection (tcp, tls, ws) etc
396-
conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, c.options.Dialer)
396+
if c.options.CustomOpenConnectionFn != nil{
397+
conn, err = c.options.CustomOpenConnectionFn(broker, c.options)
398+
} else {
399+
conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, c.options.Dialer)
400+
}
397401
if err != nil {
398402
ERROR.Println(CLI, err.Error())
399403
WARN.Println(CLI, "failed to connect to broker, trying next")

client_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright (c) 2021 IBM Corp and others.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Public License v2.0
6+
* and Eclipse Distribution License v1.0 which accompany this distribution.
7+
*
8+
* The Eclipse Public License is available at
9+
* https://www.eclipse.org/legal/epl-2.0/
10+
* and the Eclipse Distribution License is available at
11+
* http://www.eclipse.org/org/documents/edl-v10.php.
12+
*
13+
* Contributors:
14+
* Seth Hoenig
15+
* Allan Stockdill-Mander
16+
* Mike Robertson
17+
* Matt Brittan
18+
*/
19+
package mqtt
20+
21+
import (
22+
"net"
23+
"net/url"
24+
"strings"
25+
"testing"
26+
"time"
27+
)
28+
29+
func TestCustomConnectionFunction(t *testing.T) {
30+
// Set netpipe to emu
31+
netClient, netServer := net.Pipe()
32+
defer netClient.Close()
33+
defer netServer.Close()
34+
var firstMessage = ""
35+
go func() {
36+
// read first message only
37+
bytes := make([]byte, 1024)
38+
n, err := netServer.Read(bytes)
39+
if err != nil {
40+
t.Errorf("%v", err)
41+
}
42+
firstMessage = string(bytes[:n])
43+
}()
44+
// Set custom network connection function and client connect
45+
var customConnectionFunc OpenConnectionFunc = func(uri *url.URL, options ClientOptions) (net.Conn, error) {
46+
return netClient, nil
47+
}
48+
options := &ClientOptions{
49+
CustomOpenConnectionFn: customConnectionFunc,
50+
}
51+
brokerAddr := netServer.LocalAddr().Network()
52+
options.AddBroker(brokerAddr)
53+
client := NewClient(options)
54+
55+
// Try to connect using custom function, wait for 2 seconds, to pass MQTT first message
56+
if token := client.Connect(); token.WaitTimeout(2*time.Second) && token.Error() != nil {
57+
t.Errorf("%v", token.Error())
58+
}
59+
60+
// Analyze first message sent by client and received by the server
61+
if len(firstMessage) <= 0 || !strings.Contains(firstMessage, "MQTT") {
62+
t.Error("no message recieved on connect")
63+
}
64+
}

options.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ type ReconnectHandler func(Client, *ClientOptions)
5757
// ConnectionAttemptHandler is invoked prior to making the initial connection.
5858
type ConnectionAttemptHandler func(broker *url.URL, tlsCfg *tls.Config) *tls.Config
5959

60+
// OpenConnectionFunc is invoked to establish the underlying network connection
61+
// Its purpose if for custom network transports.
62+
// Does not carry out any MQTT specific handshakes.
63+
type OpenConnectionFunc func(uri *url.URL, options ClientOptions) (net.Conn, error)
64+
6065
// ClientOptions contains configurable options for an Client. Note that these should be set using the
6166
// relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage.
6267
// WARNING: Create the below using NewClientOptions unless you have a compelling reason not to. It is easy
@@ -98,6 +103,7 @@ type ClientOptions struct {
98103
WebsocketOptions *WebsocketOptions
99104
MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming
100105
Dialer *net.Dialer
106+
CustomOpenConnectionFn OpenConnectionFunc
101107
}
102108

103109
// NewClientOptions will create a new ClientClientOptions type with some
@@ -140,6 +146,7 @@ func NewClientOptions() *ClientOptions {
140146
HTTPHeaders: make(map[string][]string),
141147
WebsocketOptions: &WebsocketOptions{},
142148
Dialer: &net.Dialer{Timeout: 30 * time.Second},
149+
CustomOpenConnectionFn: nil,
143150
}
144151
return o
145152
}
@@ -429,3 +436,13 @@ func (o *ClientOptions) SetDialer(dialer *net.Dialer) *ClientOptions {
429436
o.Dialer = dialer
430437
return o
431438
}
439+
440+
// SetCustomOpenConectionFn replaces the inbuilt function that establishes a network connection with a custom function.
441+
// The passed in function should return an open `net.Conn` or an error (see the existing openConnection function for an example)
442+
// It enables custom networking types in addition to the defaults (tcp, tls, websockets...)
443+
func (o *ClientOptions) SetCustomOpenConectionFn(customOpenConnectionfn OpenConnectionFunc) *ClientOptions {
444+
if customOpenConnectionfn != nil {
445+
o.CustomOpenConnectionFn = customOpenConnectionfn
446+
}
447+
return o
448+
}

options_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (c) 2021 IBM Corp and others.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Public License v2.0
6+
* and Eclipse Distribution License v1.0 which accompany this distribution.
7+
*
8+
* The Eclipse Public License is available at
9+
* https://www.eclipse.org/legal/epl-2.0/
10+
* and the Eclipse Distribution License is available at
11+
* http://www.eclipse.org/org/documents/edl-v10.php.
12+
*
13+
* Contributors:
14+
* Seth Hoenig
15+
* Allan Stockdill-Mander
16+
* Mike Robertson
17+
* Måns Ansgariusson
18+
*/
19+
20+
// Portions copyright © 2018 TIBCO Software Inc.
21+
package mqtt
22+
23+
import (
24+
"fmt"
25+
"net"
26+
"net/url"
27+
"testing"
28+
)
29+
30+
func TestSetCustomConnectionOptions(t *testing.T) {
31+
var customConnectionFunc OpenConnectionFunc = func(uri *url.URL, options ClientOptions) (net.Conn, error) {
32+
return nil, fmt.Errorf("not implemented open connection func")
33+
}
34+
options := &ClientOptions{}
35+
options = options.SetCustomOpenConectionFn(customConnectionFunc)
36+
if options.CustomOpenConnectionFn == nil {
37+
t.Error("custom open connection function cannot be set")
38+
}
39+
}

0 commit comments

Comments
 (0)