5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-12-23 11:05:40 +00:00

Merge pull request #1044 from yggdrasil-network/arc/linkfix

Fix duplicate connections
This commit is contained in:
Arceliar 2023-06-18 08:49:20 -05:00 committed by GitHub
commit 31177f5a73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 257 additions and 241 deletions

View File

@ -6,8 +6,11 @@ import (
"fmt" "fmt"
"net" "net"
"net/url" "net/url"
"sync/atomic"
"time" "time"
"github.com/Arceliar/phony"
"github.com/Arceliar/ironwood/network" "github.com/Arceliar/ironwood/network"
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
) )
@ -70,32 +73,30 @@ func (c *Core) GetPeers() []PeerInfo {
conns[p.Conn] = p conns[p.Conn] = p
} }
c.links.RLock() phony.Block(&c.links, func() {
defer c.links.RUnlock() for info, state := range c.links._links {
for info, state := range c.links._links { var peerinfo PeerInfo
var peerinfo PeerInfo var conn net.Conn
var conn net.Conn peerinfo.URI = info.uri
state.RLock() peerinfo.LastError = state._err
peerinfo.URI = info.uri peerinfo.LastErrorTime = state._errtime
peerinfo.LastError = state._err if c := state._conn; c != nil {
peerinfo.LastErrorTime = state._errtime conn = c
if c := state._conn; c != nil { peerinfo.Up = true
conn = c peerinfo.Inbound = state.linkType == linkTypeIncoming
peerinfo.Up = true peerinfo.RXBytes = atomic.LoadUint64(&c.rx)
peerinfo.Inbound = state.linkType == linkTypeIncoming peerinfo.TXBytes = atomic.LoadUint64(&c.tx)
peerinfo.RXBytes = c.rx peerinfo.Uptime = time.Since(c.up)
peerinfo.TXBytes = c.tx }
peerinfo.Uptime = time.Since(c.up) if p, ok := conns[conn]; ok {
peerinfo.Key = p.Key
peerinfo.Root = p.Root
peerinfo.Port = p.Port
peerinfo.Priority = p.Priority
}
peers = append(peers, peerinfo)
} }
state.RUnlock() })
if p, ok := conns[conn]; ok {
peerinfo.Key = p.Key
peerinfo.Root = p.Root
peerinfo.Port = p.Port
peerinfo.Priority = p.Priority
}
peers = append(peers, peerinfo)
}
return peers return peers
} }

View File

@ -13,7 +13,6 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -30,13 +29,14 @@ const (
) )
type links struct { type links struct {
core *Core phony.Inbox
tcp *linkTCP // TCP interface support core *Core
tls *linkTLS // TLS interface support tcp *linkTCP // TCP interface support
unix *linkUNIX // UNIX interface support tls *linkTLS // TLS interface support
socks *linkSOCKS // SOCKS interface support unix *linkUNIX // UNIX interface support
sync.RWMutex // Protects the below socks *linkSOCKS // SOCKS interface support
_links map[linkInfo]*link // *link is nil if connection in progress // _links can only be modified safely from within the links actor
_links map[linkInfo]*link // *link is nil if connection in progress
} }
type linkProtocol interface { type linkProtocol interface {
@ -52,13 +52,13 @@ type linkInfo struct {
// link tracks the state of a connection, either persistent or non-persistent // link tracks the state of a connection, either persistent or non-persistent
type link struct { type link struct {
kick chan struct{} // Attempt to reconnect now, if backing off kick chan struct{} // Attempt to reconnect now, if backing off
linkType linkType // Type of link, i.e. outbound/inbound, persistent/ephemeral linkType linkType // Type of link, i.e. outbound/inbound, persistent/ephemeral
linkProto string // Protocol carrier of link, e.g. TCP, AWDL linkProto string // Protocol carrier of link, e.g. TCP, AWDL
sync.RWMutex // Protects the below // The remaining fields can only be modified safely from within the links actor
_conn *linkConn // Connected link, if any, nil if not connected _conn *linkConn // Connected link, if any, nil if not connected
_err error // Last error on the connection, if any _err error // Last error on the connection, if any
_errtime time.Time // Last time an error occured _errtime time.Time // Last time an error occured
} }
type linkOptions struct { type linkOptions struct {
@ -131,183 +131,192 @@ const ErrLinkPinnedKeyInvalid = linkError("pinned public key is invalid")
const ErrLinkUnrecognisedSchema = linkError("link schema unknown") const ErrLinkUnrecognisedSchema = linkError("link schema unknown")
func (l *links) add(u *url.URL, sintf string, linkType linkType) error { func (l *links) add(u *url.URL, sintf string, linkType linkType) error {
// Generate the link info and see whether we think we already var retErr error
// have an open peering to this peer. phony.Block(l, func() {
lu := urlForLinkInfo(*u) // Generate the link info and see whether we think we already
info := linkInfo{ // have an open peering to this peer.
uri: lu.String(), lu := urlForLinkInfo(*u)
sintf: sintf, info := linkInfo{
} uri: lu.String(),
sintf: sintf,
// Collect together the link options, these are global options
// that are not specific to any given protocol.
var options linkOptions
for _, pubkey := range u.Query()["key"] {
sigPub, err := hex.DecodeString(pubkey)
if err != nil {
return ErrLinkPinnedKeyInvalid
} }
var sigPubKey keyArray
copy(sigPubKey[:], sigPub)
if options.pinnedEd25519Keys == nil {
options.pinnedEd25519Keys = map[keyArray]struct{}{}
}
options.pinnedEd25519Keys[sigPubKey] = struct{}{}
}
if p := u.Query().Get("priority"); p != "" {
pi, err := strconv.ParseUint(p, 10, 8)
if err != nil {
return ErrLinkPriorityInvalid
}
options.priority = uint8(pi)
}
// If we think we're already connected to this peer, load up // Collect together the link options, these are global options
// the existing peer state. Try to kick the peer if possible, // that are not specific to any given protocol.
// which will cause an immediate connection attempt if it is var options linkOptions
// backing off for some reason. for _, pubkey := range u.Query()["key"] {
l.Lock() sigPub, err := hex.DecodeString(pubkey)
state, ok := l._links[info]
if ok && state != nil {
select {
case state.kick <- struct{}{}:
default:
}
l.Unlock()
return ErrLinkAlreadyConfigured
}
// Create the link entry. This will contain the connection
// in progress (if any), any error details and a context that
// lets the link be cancelled later.
state = &link{
linkType: linkType,
linkProto: strings.ToUpper(u.Scheme),
kick: make(chan struct{}),
}
// Store the state of the link so that it can be queried later.
l._links[info] = state
l.Unlock()
// Track how many consecutive connection failures we have had,
// as we will back off exponentially rather than hammering the
// remote node endlessly.
var backoff int
// backoffNow is called when there's a connection error. It
// will wait for the specified amount of time and then return
// true, unless the peering context was cancelled (due to a
// peer removal most likely), in which case it returns false.
// The caller should check the return value to decide whether
// or not to give up trying.
backoffNow := func() bool {
backoff++
duration := time.Second * time.Duration(math.Exp2(float64(backoff)))
select {
case <-time.After(duration):
return true
case <-state.kick:
return true
case <-l.core.ctx.Done():
return false
}
}
// The goroutine is responsible for attempting the connection
// and then running the handler. If the connection is persistent
// then the loop will run endlessly, using backoffs as needed.
// Otherwise the loop will end, cleaning up the link entry.
go func() {
defer func() {
l.Lock()
defer l.Unlock()
delete(l._links, info)
}()
// This loop will run each and every time we want to attempt
// a connection to this peer.
for {
conn, err := l.connect(u, info, options)
if err != nil { if err != nil {
if linkType == linkTypePersistent { retErr = ErrLinkPinnedKeyInvalid
// If the link is a persistent configured peering, return
// store information about the connection error so }
// that we can report it through the admin socket. var sigPubKey keyArray
state.Lock() copy(sigPubKey[:], sigPub)
state._conn = nil if options.pinnedEd25519Keys == nil {
state._err = err options.pinnedEd25519Keys = map[keyArray]struct{}{}
state._errtime = time.Now() }
state.Unlock() options.pinnedEd25519Keys[sigPubKey] = struct{}{}
}
if p := u.Query().Get("priority"); p != "" {
pi, err := strconv.ParseUint(p, 10, 8)
if err != nil {
retErr = ErrLinkPriorityInvalid
return
}
options.priority = uint8(pi)
}
// Back off for a bit. If true is returned here, we // If we think we're already connected to this peer, load up
// can continue onto the next loop iteration to try // the existing peer state. Try to kick the peer if possible,
// the next connection. // which will cause an immediate connection attempt if it is
// backing off for some reason.
state, ok := l._links[info]
if ok && state != nil {
select {
case state.kick <- struct{}{}:
default:
}
retErr = ErrLinkAlreadyConfigured
return
}
// Create the link entry. This will contain the connection
// in progress (if any), any error details and a context that
// lets the link be cancelled later.
state = &link{
linkType: linkType,
linkProto: strings.ToUpper(u.Scheme),
kick: make(chan struct{}),
}
// Store the state of the link so that it can be queried later.
l._links[info] = state
// Track how many consecutive connection failures we have had,
// as we will back off exponentially rather than hammering the
// remote node endlessly.
var backoff int
// backoffNow is called when there's a connection error. It
// will wait for the specified amount of time and then return
// true, unless the peering context was cancelled (due to a
// peer removal most likely), in which case it returns false.
// The caller should check the return value to decide whether
// or not to give up trying.
backoffNow := func() bool {
backoff++
duration := time.Second * time.Duration(math.Exp2(float64(backoff)))
select {
case <-time.After(duration):
return true
case <-state.kick:
return true
case <-l.core.ctx.Done():
return false
}
}
// The goroutine is responsible for attempting the connection
// and then running the handler. If the connection is persistent
// then the loop will run endlessly, using backoffs as needed.
// Otherwise the loop will end, cleaning up the link entry.
go func() {
defer func() {
phony.Block(l, func() {
if l._links[info] == state {
delete(l._links, info)
}
})
}()
// This loop will run each and every time we want to attempt
// a connection to this peer.
// TODO get rid of this loop, this is *exactly* what time.AfterFunc is for, we should just send a signal to the links actor to kick off a goroutine as needed
for {
conn, err := l.connect(u, info, options)
if err != nil {
if linkType == linkTypePersistent {
// If the link is a persistent configured peering,
// store information about the connection error so
// that we can report it through the admin socket.
phony.Block(l, func() {
state._conn = nil
state._err = err
state._errtime = time.Now()
})
// Back off for a bit. If true is returned here, we
// can continue onto the next loop iteration to try
// the next connection.
if backoffNow() {
continue
} else {
return
}
} else {
// Ephemeral and incoming connections don't remain
// after a connection failure, so exit out of the
// loop and clean up the link entry.
break
}
}
// The linkConn wrapper allows us to track the number of
// bytes written to and read from this connection without
// the help of ironwood.
lc := &linkConn{
Conn: conn,
up: time.Now(),
}
// Update the link state with our newly wrapped connection.
// Clear the error state.
var doRet bool
phony.Block(l, func() {
if state._conn != nil {
// If a peering has come up in this time, abort this one.
doRet = true
}
state._conn = lc
})
if doRet {
return
}
// Give the connection to the handler. The handler will block
// for the lifetime of the connection.
if err = l.handler(linkType, options, lc); err != nil && err != io.EOF {
l.core.log.Debugf("Link %s error: %s\n", info.uri, err)
} else {
backoff = 0
}
// The handler has stopped running so the connection is dead,
// try to close the underlying socket just in case and then
// update the link state.
_ = lc.Close()
phony.Block(l, func() {
state._conn = nil
if state._err = err; state._err != nil {
state._errtime = time.Now()
}
})
// If the link is persistently configured, back off if needed
// and then try reconnecting. Otherwise, exit out.
if linkType == linkTypePersistent {
if backoffNow() { if backoffNow() {
continue continue
} else { } else {
return return
} }
} else { } else {
// Ephemeral and incoming connections don't remain
// after a connection failure, so exit out of the
// loop and clean up the link entry.
break break
} }
} }
}()
// The linkConn wrapper allows us to track the number of })
// bytes written to and read from this connection without return retErr
// the help of ironwood.
lc := &linkConn{
Conn: conn,
up: time.Now(),
}
// Update the link state with our newly wrapped connection.
// Clear the error state.
state.Lock()
if state._conn != nil {
// If a peering has come up in this time, abort this one.
state.Unlock()
return
}
state._conn = lc
state.Unlock()
// Give the connection to the handler. The handler will block
// for the lifetime of the connection.
if err = l.handler(linkType, options, lc); err != nil && err != io.EOF {
l.core.log.Debugf("Link %s error: %s\n", info.uri, err)
} else {
backoff = 0
}
// The handler has stopped running so the connection is dead,
// try to close the underlying socket just in case and then
// update the link state.
_ = lc.Close()
state.Lock()
state._conn = nil
if state._err = err; state._err != nil {
state._errtime = time.Now()
}
state.Unlock()
// If the link is persistently configured, back off if needed
// and then try reconnecting. Otherwise, exit out.
if linkType == linkTypePersistent {
if backoffNow() {
continue
} else {
return
}
} else {
break
}
}
}()
return nil
} }
func (l *links) listen(u *url.URL, sintf string) (*Listener, error) { func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
@ -369,43 +378,45 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
// If there's an existing link state for this link, get it. // If there's an existing link state for this link, get it.
// If this node is already connected to us, just drop the // If this node is already connected to us, just drop the
// connection. This prevents duplicate peerings. // connection. This prevents duplicate peerings.
l.Lock() var lc *linkConn
state, ok := l._links[info] var state *link
if !ok || state == nil { phony.Block(l, func() {
state = &link{ var ok bool
linkType: linkTypeIncoming, state, ok = l._links[info]
linkProto: strings.ToUpper(u.Scheme), if !ok || state == nil {
kick: make(chan struct{}), state = &link{
linkType: linkTypeIncoming,
linkProto: strings.ToUpper(u.Scheme),
kick: make(chan struct{}),
}
} }
} if state._conn != nil {
state.Lock() // If a connection has come up in this time, abort
if state._conn != nil { // this one.
// If a connection has come up in this time, abort return
// this one. }
state.Unlock()
l.Unlock() // The linkConn wrapper allows us to track the number of
// bytes written to and read from this connection without
// the help of ironwood.
lc = &linkConn{
Conn: conn,
up: time.Now(),
}
// Update the link state with our newly wrapped connection.
// Clear the error state.
state._conn = lc
state._err = nil
state._errtime = time.Time{}
// Store the state of the link so that it can be queried later.
l._links[info] = state
})
if lc == nil {
return return
} }
// The linkConn wrapper allows us to track the number of
// bytes written to and read from this connection without
// the help of ironwood.
lc := &linkConn{
Conn: conn,
up: time.Now(),
}
// Update the link state with our newly wrapped connection.
// Clear the error state.
state._conn = lc
state._err = nil
state._errtime = time.Time{}
state.Unlock()
// Store the state of the link so that it can be queried later.
l._links[info] = state
l.Unlock()
// Give the connection to the handler. The handler will block // Give the connection to the handler. The handler will block
// for the lifetime of the connection. // for the lifetime of the connection.
if err = l.handler(linkTypeIncoming, options, lc); err != nil && err != io.EOF { if err = l.handler(linkTypeIncoming, options, lc); err != nil && err != io.EOF {
@ -416,9 +427,11 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
// try to close the underlying socket just in case and then // try to close the underlying socket just in case and then
// drop the link state. // drop the link state.
_ = lc.Close() _ = lc.Close()
l.Lock() phony.Block(l, func() {
delete(l._links, info) if l._links[info] == state {
l.Unlock() delete(l._links, info)
}
})
}(conn) }(conn)
} }
}() }()

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"math/rand"
"net" "net"
"net/url" "net/url"
"time" "time"
@ -337,7 +338,8 @@ func (m *Multicast) _announce() {
break break
} }
} }
m._timer = time.AfterFunc(time.Second, func() { annInterval := time.Second + time.Microsecond*(time.Duration(rand.Intn(1048576))) // Randomize delay
m._timer = time.AfterFunc(annInterval, func() {
m.Act(nil, m._announce) m.Act(nil, m._announce)
}) })
} }