From 333561f4e17206f0949409e27f4cebf2a78fe1f7 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sat, 20 May 2023 23:44:31 +0100 Subject: [PATCH] Tweak link state locking, add comments, listener priority, other fixes --- src/core/api.go | 27 ++++---- src/core/link.go | 173 ++++++++++++++++++++++++++++++++--------------- 2 files changed, 132 insertions(+), 68 deletions(-) diff --git a/src/core/api.go b/src/core/api.go index c8d4515..c6e6038 100644 --- a/src/core/api.go +++ b/src/core/api.go @@ -9,7 +9,6 @@ import ( "time" "github.com/Arceliar/ironwood/network" - "github.com/Arceliar/phony" "github.com/yggdrasil-network/yggdrasil-go/src/address" ) @@ -76,19 +75,19 @@ func (c *Core) GetPeers() []PeerInfo { for info, state := range c.links._links { var peerinfo PeerInfo var conn net.Conn - phony.Block(state, func() { - peerinfo.URI = info.uri - peerinfo.LastError = state._err - peerinfo.LastErrorTime = state._errtime - if c := state._conn; c != nil { - conn = c - peerinfo.Up = true - peerinfo.Inbound = info.linkType == linkTypeIncoming - peerinfo.RXBytes = c.rx - peerinfo.TXBytes = c.tx - peerinfo.Uptime = time.Since(c.up) - } - }) + state.RLock() + peerinfo.URI = info.uri + peerinfo.LastError = state._err + peerinfo.LastErrorTime = state._errtime + if c := state._conn; c != nil { + conn = c + peerinfo.Up = true + peerinfo.Inbound = state.linkType == linkTypeIncoming + peerinfo.RXBytes = c.rx + peerinfo.TXBytes = c.tx + peerinfo.Uptime = time.Since(c.up) + } + state.RUnlock() if p, ok := conns[conn]; ok { peerinfo.Key = p.Key peerinfo.Root = p.Root diff --git a/src/core/link.go b/src/core/link.go index a432e08..d68d1bf 100644 --- a/src/core/link.go +++ b/src/core/link.go @@ -46,23 +46,19 @@ type linkProtocol interface { // linkInfo is used as a map key type linkInfo struct { - uri string // Peering URI in complete form - sintf string // Peering source interface (i.e. from InterfacePeers) - linkType linkType // Type of link, i.e. outbound/inbound, persistent/ephemeral + uri string // Peering URI in complete form + sintf string // Peering source interface (i.e. from InterfacePeers) } // link tracks the state of a connection, either persistent or non-persistent type link struct { - phony.Inbox - ctx context.Context // - cancel context.CancelFunc // - kick chan struct{} // Attempt to reconnect now, if backing off - info linkInfo // - linkProto string // Protocol carrier of link, e.g. TCP, AWDL - _conn *linkConn // Connected link, if any, nil if not connected - _err error // Last error on the connection, if any - _errtime time.Time // Last time an error occured - + kick chan struct{} // Attempt to reconnect now, if backing off + linkType linkType // Type of link, i.e. outbound/inbound, persistent/ephemeral + linkProto string // Protocol carrier of link, e.g. TCP, AWDL + sync.RWMutex // Protects the below + _conn *linkConn // Connected link, if any, nil if not connected + _err error // Last error on the connection, if any + _errtime time.Time // Last time an error occured } type linkOptions struct { @@ -149,10 +145,14 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error { // have an open peering to this peer. lu := urlForLinkInfo(*u) info := linkInfo{ - uri: lu.String(), - sintf: sintf, - linkType: linkType, + uri: lu.String(), + sintf: sintf, } + + // If we think we're already connected to this peer, load up + // the existing peer state. Try to kick the peer if possible, + // which will cause an immediate connection attempt if it is + // backing off for some reason. l.RLock() state, ok := l._links[info] l.RUnlock() @@ -167,12 +167,10 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error { // Create the link entry. This will contain the connection // in progress (if any), any error details and a context that // lets the link be cancelled later. - ctx, cancel := context.WithCancel(l.core.ctx) state = &link{ - info: info, + linkType: linkType, linkProto: strings.ToUpper(u.Scheme), - ctx: ctx, - cancel: cancel, + kick: make(chan struct{}), } // Collect together the link options, these are global options @@ -198,8 +196,7 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error { options.priority = uint8(pi) } - // Store the state of the link, try to connect and then run - // the handler. + // Store the state of the link so that it can be queried later. l.Lock() l._links[info] = state l.Unlock() @@ -223,7 +220,7 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error { return true case <-state.kick: return true - case <-ctx.Done(): + case <-l.core.ctx.Done(): return false } } @@ -238,44 +235,75 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error { defer l.Unlock() delete(l._links, info) }() + + // This loop will run each and every time we want to attempt + // a connection to this peer. for { conn, err := l.connect(u, info, options) if err != nil { if linkType == linkTypePersistent { - phony.Block(state, func() { - state._err = err - state._errtime = time.Now() - }) + // If the link is a persistent configured peering, + // store information about the connection error so + // that we can report it through the admin socket. + state.Lock() + state._conn = nil + state._err = err + state._errtime = time.Now() + state.Unlock() + + // Back off for a bit. If true is returned here, we + // can continue onto the next loop iteration to try + // the next connection. if backoffNow() { continue } else { return } } else { + // Ephemeral and incoming connections don't remain + // after a connection failure, so exit out of the + // loop and clean up the link entry. break } } + + // The linkConn wrapper allows us to track the number of + // bytes written to and read from this connection without + // the help of ironwood. lc := &linkConn{ Conn: conn, up: time.Now(), } - phony.Block(state, func() { - state._conn = lc - state._err = nil - state._errtime = time.Time{} - }) - if err = l.handler(&info, options, lc); err != nil && err != io.EOF { + + // Update the link state with our newly wrapped connection. + // Clear the error state. + state.Lock() + state._conn = lc + state._err = nil + state._errtime = time.Time{} + state.Unlock() + + // Give the connection to the handler. The handler will block + // for the lifetime of the connection. + if err = l.handler(linkType, options, lc); err != nil && err != io.EOF { l.core.log.Debugf("Link %s error: %s\n", info.uri, err) } else { backoff = 0 } - _ = conn.Close() - phony.Block(state, func() { - state._conn = nil - if state._err = err; state._err != nil { - state._errtime = time.Now() - } - }) + + // The handler has stopped running so the connection is dead, + // try to close the underlying socket just in case and then + // update the link state. + _ = lc.Close() + state.Lock() + state._conn = nil + if state._err = err; state._err != nil { + state._errtime = time.Now() + } + state.Unlock() + + // If the link is persistently configured, back off if needed + // and then try reconnecting. Otherwise, exit out. if linkType == linkTypePersistent { if backoffNow() { continue @@ -314,6 +342,16 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) { ctx: ctx, Cancel: cancel, } + + var options linkOptions + if p := u.Query().Get("priority"); p != "" { + pi, err := strconv.ParseUint(p, 10, 8) + if err != nil { + return nil, ErrLinkPriorityInvalid + } + options.priority = uint8(pi) + } + go func() { l.core.log.Printf("%s listener started on %s", strings.ToUpper(u.Scheme), listener.Addr()) defer l.core.log.Printf("%s listener stopped on %s", strings.ToUpper(u.Scheme), listener.Addr()) @@ -324,41 +362,68 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) { } go func(conn net.Conn) { defer conn.Close() + + // In order to populate a somewhat sane looking connection + // URI in the admin socket, we need to replace the host in + // the listener URL with the remote address. pu := *u pu.Host = conn.RemoteAddr().String() lu := urlForLinkInfo(pu) info := linkInfo{ - uri: lu.String(), - sintf: sintf, - linkType: linkTypeEphemeral, // TODO: should be incoming + uri: lu.String(), + sintf: sintf, } + + // If this node is already connected to us, just drop the + // connection. This prevents duplicate peerings. if l.isConnectedTo(info) { return } + + // If there's an existing link state for this link, get it. + // Otherwise just create a new one. l.RLock() state, ok := l._links[info] l.RUnlock() if !ok || state == nil { state = &link{ - info: info, + linkType: linkTypeIncoming, + linkProto: strings.ToUpper(u.Scheme), + kick: make(chan struct{}), } } + + // The linkConn wrapper allows us to track the number of + // bytes written to and read from this connection without + // the help of ironwood. lc := &linkConn{ Conn: conn, up: time.Now(), } - var options linkOptions - phony.Block(state, func() { - state._conn = lc - state._err = nil - state.linkProto = strings.ToUpper(u.Scheme) - }) + + // Update the link state with our newly wrapped connection. + // Clear the error state. + state.Lock() + state._conn = lc + state._err = nil + state._errtime = time.Time{} + state.Unlock() + + // Store the state of the link so that it can be queried later. l.Lock() l._links[info] = state l.Unlock() - if err = l.handler(&info, options, lc); err != nil && err != io.EOF { + + // Give the connection to the handler. The handler will block + // for the lifetime of the connection. + if err = l.handler(linkTypeIncoming, options, lc); err != nil && err != io.EOF { l.core.log.Debugf("Link %s error: %s\n", u.Host, err) } + + // The handler has stopped running so the connection is dead, + // try to close the underlying socket just in case and then + // drop the link state. + _ = lc.Close() l.Lock() delete(l._links, info) l.Unlock() @@ -401,7 +466,7 @@ func (l *links) connect(u *url.URL, info linkInfo, options linkOptions) (net.Con return dialer.dial(u, info, options) } -func (l *links) handler(info *linkInfo, options linkOptions, conn net.Conn) error { +func (l *links) handler(linkType linkType, options linkOptions, conn net.Conn) error { meta := version_getBaseMetadata() meta.publicKey = l.core.public metaBytes := meta.encode() @@ -453,12 +518,12 @@ func (l *links) handler(info *linkInfo, options linkOptions, conn net.Conn) erro break } } - if info.linkType == linkTypeIncoming && !isallowed { + if linkType == linkTypeIncoming && !isallowed { return fmt.Errorf("node public key %q is not in AllowedPublicKeys", hex.EncodeToString(meta.publicKey)) } dir := "outbound" - if info.linkType == linkTypeIncoming { + if linkType == linkTypeIncoming { dir = "inbound" } remoteAddr := net.IP(address.AddrForKey(meta.publicKey)[:]).String()