Skip to content

Commit 271a1d6

Browse files
committed
wip: Use actor package
1 parent 226dc4e commit 271a1d6

File tree

6 files changed

+137
-32
lines changed

6 files changed

+137
-32
lines changed

go.mod

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,9 @@ require (
6666

6767
require (
6868
dario.cat/mergo v1.0.1 // indirect
69-
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
70-
github.com/Microsoft/go-winio v0.6.1 // indirect
71-
github.com/NebulousLabs/fastrand v0.0.0-20181203155948-6fb6489aac4e // indirect
72-
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
7369
github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da // indirect
7470
github.com/aead/siphash v1.0.1 // indirect
71+
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
7572
github.com/beorn7/perks v1.0.1 // indirect
7673
github.com/btcsuite/btcwallet/wallet/txsizes v1.2.5 // indirect
7774
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect
@@ -126,14 +123,18 @@ require (
126123
github.com/klauspost/compress v1.17.9
127124
github.com/lib/pq v1.10.9 // indirect
128125
github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf // indirect
126+
github.com/lightningnetwork/lnd/actor v0.0.3 // indirect
129127
github.com/mattn/go-isatty v0.0.20 // indirect
130128
github.com/mattn/go-runewidth v0.0.13 // indirect
131129
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
130+
github.com/Microsoft/go-winio v0.6.1 // indirect
132131
github.com/moby/docker-image-spec v1.3.1 // indirect
133132
github.com/moby/term v0.5.0 // indirect
134133
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
135134
github.com/modern-go/reflect2 v1.0.1 // indirect
136135
github.com/ncruces/go-strftime v0.1.9 // indirect
136+
github.com/NebulousLabs/fastrand v0.0.0-20181203155948-6fb6489aac4e // indirect
137+
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
137138
github.com/nxadm/tail v1.4.8 // indirect
138139
github.com/onsi/gomega v1.26.0 // indirect
139140
github.com/opencontainers/go-digest v1.0.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,8 @@ github.com/lightninglabs/neutrino/cache v1.1.2 h1:C9DY/DAPaPxbFC+xNNEI/z1SJY9GS3
370370
github.com/lightninglabs/neutrino/cache v1.1.2/go.mod h1:XJNcgdOw1LQnanGjw8Vj44CvguYA25IMKjWFZczwZuo=
371371
github.com/lightninglabs/protobuf-go-hex-display v1.33.0-hex-display h1:Y2WiPkBS/00EiEg0qp0FhehxnQfk3vv8U6Xt3nN+rTY=
372372
github.com/lightninglabs/protobuf-go-hex-display v1.33.0-hex-display/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
373+
github.com/lightningnetwork/lnd/actor v0.0.3 h1:m+v0UuHdEUg4S82J4aC40pGonScckzsu1K9sdZeAo5Y=
374+
github.com/lightningnetwork/lnd/actor v0.0.3/go.mod h1:RKgQPYHLVHuIRdMF/Q8+NPV2Nva3F66ZsULD2+TskKc=
373375
github.com/lightningnetwork/lnd/cert v1.2.2 h1:71YK6hogeJtxSxw2teq3eGeuy4rHGKcFf0d0Uy4qBjI=
374376
github.com/lightningnetwork/lnd/cert v1.2.2/go.mod h1:jQmFn/Ez4zhDgq2hnYSw8r35bqGVxViXhX6Cd7HXM6U=
375377
github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsDIBjgjWdZgA0=

msgmux/msg_router.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type Endpoint interface {
5050
SendMessage(ctx context.Context, msg PeerMsg) bool
5151
}
5252

53-
// MsgRouter is an interface that represents a message router, which is generic
53+
// Router is an interface that represents a message router, which is generic
5454
// sub-system capable of routing any incoming wire message to a set of
5555
// registered endpoints.
5656
type Router interface {

onionmessage/onion_endpoint.go

Lines changed: 85 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010

1111
"github.com/btcsuite/btcd/btcec/v2"
1212
"github.com/btcsuite/btclog/v2"
13+
"github.com/lightningnetwork/lnd/actor"
14+
"github.com/lightningnetwork/lnd/fn/v2"
1315
"github.com/lightningnetwork/lnd/htlcswitch/hop"
1416
"github.com/lightningnetwork/lnd/lnutils"
1517
"github.com/lightningnetwork/lnd/lnwire"
@@ -48,11 +50,64 @@ var (
4850
ErrNoNextNodeID = errors.New("next node ID required")
4951
)
5052

51-
// OnionMessageSender is a function type that defines how to send an onion
52-
// message. It takes the next node's public key (as [33]byte), the blinding
53-
// point (*btcec.PublicKey), and the onion packet ([]byte) to send to a peer.
54-
type OnionMessageSender func(context.Context, [33]byte, *btcec.PublicKey,
55-
[]byte) error
53+
type OMRequest struct {
54+
// Embed BaseMessage to satisfy the Message interface.
55+
actor.BaseMessage
56+
msg lnwire.OnionMessage
57+
}
58+
59+
// MessageType returns a string identifier for this message type.
60+
func (m *OMRequest) MessageType() string {
61+
return "OnionMessageRequest"
62+
}
63+
64+
// MyResponse might be a corresponding response type.
65+
type OMResponse struct {
66+
actor.BaseMessage
67+
Success bool
68+
}
69+
70+
func (m *OMResponse) MessageType() string {
71+
return "OnionMessageResponse"
72+
}
73+
74+
type OnionPeerActorRef actor.ActorRef[*OMRequest, *OMResponse]
75+
76+
// PeerOnionSenderKey returns the actor key used to send onion messages to a
77+
// specific peer identified by their public key.
78+
func SpawnOnionPeerActor(system *actor.ActorSystem,
79+
sender func(msg *lnwire.OnionMessage),
80+
pubKey [33]byte) OnionPeerActorRef {
81+
82+
// The actor logic creates a function behavior that sends onion messages
83+
// using the provided sender function.
84+
actorLogic := func(ctx context.Context,
85+
req *OMRequest) fn.Result[*OMResponse] {
86+
87+
select {
88+
case <-ctx.Done():
89+
return fn.Err[*OMResponse](
90+
errors.New("actor shutting down"),
91+
)
92+
default:
93+
}
94+
95+
sender(&req.msg)
96+
response := &OMResponse{Success: true}
97+
return fn.Ok(response)
98+
}
99+
100+
// Create a behavior from the function.
101+
behavior := actor.NewFunctionBehavior(actorLogic)
102+
103+
pubKeyHex := hex.EncodeToString(pubKey[:])
104+
serviceKey := actor.NewServiceKey[*OMRequest, *OMResponse](pubKeyHex)
105+
actorRef := serviceKey.Spawn(
106+
system, "onion-peer-actor-"+pubKeyHex, behavior,
107+
)
108+
109+
return actorRef
110+
}
56111

57112
// OnionMessageUpdate is onion message update dispatched to any potential
58113
// subscriber.
@@ -89,13 +144,6 @@ func WithMessageServer(server *subscribe.Server) OnionEndpointOption {
89144
}
90145
}
91146

92-
// WithMessageSender sets the onion message sender for the OnionEndpoint.
93-
func WithMessageSender(msgSender OnionMessageSender) OnionEndpointOption {
94-
return func(o *OnionEndpoint) {
95-
o.MsgSender = msgSender
96-
}
97-
}
98-
99147
// WithOnionProcessor sets the onion processor for the OnionEndpoint.
100148
func WithOnionProcessor(processor *hop.OnionProcessor) OnionEndpointOption {
101149
return func(o *OnionEndpoint) {
@@ -111,18 +159,19 @@ type OnionEndpoint struct {
111159
// onionProcessor is the onion processor used to process onion packets.
112160
onionProcessor *hop.OnionProcessor
113161

114-
// MsgSender sends a onion message to the target peer.
115-
MsgSender OnionMessageSender
162+
// receptionist is the actor system receptionist used to look up peer
163+
// onion actors.
164+
receptionist *actor.Receptionist
116165
}
117166

118167
// A compile-time check to ensure OnionEndpoint implements the Endpoint
119168
// interface.
120169
var _ msgmux.Endpoint = (*OnionEndpoint)(nil)
121170

122171
// NewOnionEndpoint creates a new OnionEndpoint with the given options.
123-
func NewOnionEndpoint(opts ...OnionEndpointOption) *OnionEndpoint {
172+
func NewOnionEndpoint(receptionist *actor.Receptionist, opts ...OnionEndpointOption) *OnionEndpoint {
124173
o := &OnionEndpoint{
125-
onionMessageServer: nil,
174+
receptionist: receptionist,
126175
}
127176
for _, opt := range opts {
128177
opt(o)
@@ -283,12 +332,26 @@ func (o *OnionEndpoint) forwardMessage(ctx context.Context,
283332
var nextNodeIDBytes [33]byte
284333
copy(nextNodeIDBytes[:], nextNodeID.SerializeCompressed())
285334

286-
err := o.MsgSender(
287-
ctx, nextNodeIDBytes, nextBlindingPoint, nextPacket,
288-
)
289-
290-
if err != nil {
291-
return fmt.Errorf("could not send message: %w", err)
335+
// err := o.MsgSender(
336+
// ctx, nextNodeIDBytes, nextBlindingPoint, nextPacket,
337+
// )
338+
339+
// Find the onion peer actor for the next node ID.
340+
pubKeyHex := hex.EncodeToString(nextNodeIDBytes[:])
341+
serviceKey := actor.NewServiceKey[*OMRequest, *OMResponse](pubKeyHex)
342+
foundActorRefs := actor.FindInReceptionist(o.receptionist, serviceKey)
343+
344+
// If we found an actor, send the onion message to it.
345+
if len(foundActorRefs) > 0 {
346+
actorRef := foundActorRefs[0]
347+
onionMsg := lnwire.NewOnionMessage(
348+
nextBlindingPoint, nextPacket,
349+
)
350+
req := &OMRequest{msg: *onionMsg}
351+
actorRef.Tell(ctx, req)
352+
} else {
353+
return fmt.Errorf("No actors found for service key: %v",
354+
serviceKey)
292355
}
293356

294357
return nil

peer/brontide.go

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/btcsuite/btcd/txscript"
2020
"github.com/btcsuite/btcd/wire"
2121
"github.com/btcsuite/btclog/v2"
22+
"github.com/lightningnetwork/lnd/actor"
2223
"github.com/lightningnetwork/lnd/aliasmgr"
2324
"github.com/lightningnetwork/lnd/brontide"
2425
"github.com/lightningnetwork/lnd/buffer"
@@ -468,9 +469,8 @@ type Config struct {
468469
// onion messages to subscribers.
469470
OnionMessageServer *subscribe.Server
470471

471-
// OnionMsgSender is a function that sends an onion message to any peer.
472-
OnionMsgSender func(context.Context, [33]byte, *btcec.PublicKey,
473-
[]byte) error
472+
// ActorSystem is the server wide actor system.
473+
ActorSystem *actor.ActorSystem
474474

475475
// ShouldFwdExpEndorsement is a closure that indicates whether
476476
// experimental endorsement signals should be set.
@@ -645,6 +645,11 @@ type Brontide struct {
645645
// when a peer disconnects.
646646
globalMsgRouter bool
647647

648+
// onionPeerActorRef is an optional actor ref that points to the onion
649+
// actor created for this peer **only if** the remote peer supports
650+
// onion messaging.
651+
onionPeerActorRef fn.Option[onionmessage.OnionPeerActorRef]
652+
648653
startReady chan struct{}
649654

650655
// cg is a helper that encapsulates a wait group and quit channel and
@@ -907,10 +912,28 @@ func (p *Brontide) Start() error {
907912
return fmt.Errorf("unable to load channels: %w", err)
908913
}
909914

915+
// If the remote peer supports onion messages, then we'll spawn the
916+
// onion peer actor, which will be used to send onion messages **to**
917+
// the remote peer.
918+
if p.remoteFeatures.HasFeature(lnwire.OnionMessagesOptional) {
919+
p.log.Infof("Remote peer supports onion messages, " +
920+
"registering onion message actor")
921+
sender := func(msg *lnwire.OnionMessage) {
922+
p.SendMessageLazy(false, msg)
923+
}
924+
onionPeerActorRef := onionmessage.SpawnOnionPeerActor(
925+
p.cfg.ActorSystem, sender, p.cfg.PubKeyBytes,
926+
)
927+
p.onionPeerActorRef = fn.Some(onionPeerActorRef)
928+
}
929+
930+
// The onion message endpoint is used to handle incoming onion messages
931+
// **from** this peer. This uses the message multiplexer to route
932+
// messages to the endpoint for further processing.
910933
onionMessageEndpoint := onionmessage.NewOnionEndpoint(
934+
p.cfg.ActorSystem.Receptionist(),
911935
onionmessage.WithMessageServer(p.cfg.OnionMessageServer),
912936
onionmessage.WithOnionProcessor(p.cfg.Sphinx),
913-
onionmessage.WithMessageSender(p.cfg.OnionMsgSender),
914937
)
915938

916939
// We register the onion message endpoint with the message router.
@@ -1677,6 +1700,12 @@ func (p *Brontide) Disconnect(reason error) {
16771700
router.Stop()
16781701
})
16791702
}
1703+
1704+
// If we have an onion peer actor, stop and remove it from the actor
1705+
// system.
1706+
p.onionPeerActorRef.WhenSome(func(ref onionmessage.OnionPeerActorRef) {
1707+
p.cfg.ActorSystem.StopAndRemoveActor(ref.ID())
1708+
})
16801709
}
16811710

16821711
// String returns the string representation of this peer.

server.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/btcsuite/btcd/wire"
2828
"github.com/btcsuite/btclog/v2"
2929
sphinx "github.com/lightningnetwork/lightning-onion"
30+
"github.com/lightningnetwork/lnd/actor"
3031
"github.com/lightningnetwork/lnd/aliasmgr"
3132
"github.com/lightningnetwork/lnd/autopilot"
3233
"github.com/lightningnetwork/lnd/brontide"
@@ -425,6 +426,10 @@ type server struct {
425426

426427
onionMessageServer *subscribe.Server
427428

429+
// actorSystem is the actor system tasked with handling actors that are
430+
// created for this server.
431+
actorSystem *actor.ActorSystem
432+
428433
// txPublisher is a publisher with fee-bumping capability.
429434
txPublisher *sweep.TxPublisher
430435

@@ -732,6 +737,8 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr,
732737

733738
onionMessageServer: subscribe.NewServer(),
734739

740+
actorSystem: actor.NewActorSystem(),
741+
735742
tlsManager: tlsManager,
736743

737744
featureMgr: featureMgr,
@@ -2592,6 +2599,9 @@ func (s *server) Stop() error {
25922599
// Stop dispatching blocks to other systems immediately.
25932600
s.blockbeatDispatcher.Stop()
25942601

2602+
// Shutdown the actor system to stop all actors.
2603+
s.actorSystem.Shutdown()
2604+
25952605
// Shutdown the wallet, funding manager, and the rpc server.
25962606
if err := s.chanStatusMgr.Stop(); err != nil {
25972607
srvrLog.Warnf("failed to stop chanStatusMgr: %v", err)
@@ -4380,7 +4390,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
43804390
TowerClient: towerClient,
43814391
DisconnectPeer: s.DisconnectPeer,
43824392
OnionMessageServer: s.onionMessageServer,
4383-
OnionMsgSender: s.SendOnionMessage,
4393+
ActorSystem: s.actorSystem,
43844394
GenNodeAnnouncement: func(...netann.NodeAnnModifier) (
43854395
lnwire.NodeAnnouncement1, error) {
43864396

0 commit comments

Comments
 (0)