5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2025-01-22 09:13:18 +00:00

Merge pull request #338 from Arceliar/fixes

Fixes
This commit is contained in:
Arceliar 2019-03-01 19:18:45 -06:00 committed by GitHub
commit 98a84ec7e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 208 additions and 81 deletions

View File

@ -6,13 +6,15 @@ import "os"
import "strings"
import "strconv"
import "time"
import "log"
import "runtime"
import "runtime/pprof"
import "flag"
import "github.com/gologme/log"
import . "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
import . "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
////////////////////////////////////////////////////////////////////////////////

View File

@ -56,3 +56,23 @@ func TimerStop(t *time.Timer) bool {
}
return true
}
// Run a blocking function with a timeout.
// Returns true if the function returns.
// Returns false if the timer fires.
// The blocked function remains blocked--the caller is responsible for somehow killing it.
func FuncTimeout(f func(), timeout time.Duration) bool {
success := make(chan struct{})
go func() {
defer close(success)
f()
}()
timer := time.NewTimer(timeout)
defer TimerStop(timer)
select {
case <-success:
return true
case <-timer.C:
return false
}
}

View File

@ -125,11 +125,15 @@ func (c *Core) addPeerLoop() {
// UpdateConfig updates the configuration in Core and then signals the
// various module goroutines to reconfigure themselves if needed
func (c *Core) UpdateConfig(config *config.NodeConfig) {
c.log.Infoln("Reloading configuration...")
c.configMutex.Lock()
c.configOld = c.config
c.config = *config
c.configMutex.Unlock()
errors := 0
components := []chan chan error{
c.admin.reconfigure,
c.searches.reconfigure,
@ -148,9 +152,16 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) {
response := make(chan error)
component <- response
if err := <-response; err != nil {
c.log.Println(err)
c.log.Errorln(err)
errors++
}
}
if errors > 0 {
c.log.Warnln(errors, "modules reported errors during configuration reload")
} else {
c.log.Infoln("Configuration reloaded successfully")
}
}
// GetBuildName gets the current build name. This is usually injected if built

View File

@ -4,6 +4,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"io"
"net"
"strings"
"sync"
@ -91,11 +92,16 @@ func (intf *linkInterface) handler() error {
meta.link = *myLinkPub
metaBytes := meta.encode()
// TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer)
err := intf.msgIO._sendMetaBytes(metaBytes)
var err error
if !util.FuncTimeout(func() { err = intf.msgIO._sendMetaBytes(metaBytes) }, 30*time.Second) {
return errors.New("timeout on metadata send")
}
if err != nil {
return err
}
metaBytes, err = intf.msgIO._recvMetaBytes()
if !util.FuncTimeout(func() { metaBytes, err = intf.msgIO._recvMetaBytes() }, 30*time.Second) {
return errors.New("timeout on metadata recv")
}
if err != nil {
return err
}
@ -109,8 +115,8 @@ func (intf *linkInterface) handler() error {
return errors.New("failed to connect: wrong version")
}
// Check if we're authorized to connect to this key / IP
if !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) {
intf.link.core.log.Debugf("%s connection to %s forbidden: AllowedEncryptionPublicKeys does not contain key %s",
if !intf.incoming && !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) {
intf.link.core.log.Warnf("%s connection to %s forbidden: AllowedEncryptionPublicKeys does not contain key %s",
strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:]))
intf.msgIO.close()
return nil
@ -162,8 +168,6 @@ func (intf *linkInterface) handler() error {
themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote)
intf.link.core.log.Infof("Connected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
defer intf.link.core.log.Infof("Disconnected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
// Start the link loop
go intf.peer.linkLoop()
// Start the writer
@ -216,8 +220,8 @@ func (intf *linkInterface) handler() error {
case signalReady <- struct{}{}:
default:
}
intf.link.core.log.Debugf("Sending packet to %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
//intf.link.core.log.Debugf("Sending packet to %s: %s, source %s",
// strings.ToUpper(intf.info.linkType), themString, intf.info.local)
}
}
}()
@ -225,26 +229,32 @@ func (intf *linkInterface) handler() error {
// Used to enable/disable activity in the switch
signalAlive := make(chan bool, 1) // True = real packet, false = keep-alive
defer close(signalAlive)
ret := make(chan error, 1) // How we signal the return value when multiple goroutines are involved
go func() {
var isAlive bool
var isReady bool
var sendTimerRunning bool
var recvTimerRunning bool
recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed
recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed
closeTime := 2 * switch_timeout // TODO or maybe this makes more sense for ReadTimeout?...
sendTime := time.Second
sendTimer := time.NewTimer(sendTime)
defer util.TimerStop(sendTimer)
recvTimer := time.NewTimer(recvTime)
defer util.TimerStop(recvTimer)
closeTimer := time.NewTimer(closeTime)
defer util.TimerStop(closeTimer)
for {
intf.link.core.log.Debugf("State of %s: %s, source %s :: isAlive %t isReady %t sendTimerRunning %t recvTimerRunning %t",
strings.ToUpper(intf.info.linkType), themString, intf.info.local,
isAlive, isReady, sendTimerRunning, recvTimerRunning)
//intf.link.core.log.Debugf("State of %s: %s, source %s :: isAlive %t isReady %t sendTimerRunning %t recvTimerRunning %t",
// strings.ToUpper(intf.info.linkType), themString, intf.info.local,
// isAlive, isReady, sendTimerRunning, recvTimerRunning)
select {
case gotMsg, ok := <-signalAlive:
if !ok {
return
}
util.TimerStop(closeTimer)
closeTimer.Reset(closeTime)
util.TimerStop(recvTimer)
recvTimerRunning = false
isAlive = true
@ -299,6 +309,14 @@ func (intf *linkInterface) handler() error {
case <-recvTimer.C:
// We haven't received anything, so assume there's a problem and don't return this node to the switch until they start responding
isAlive = false
case <-closeTimer.C:
// We haven't received anything in a really long time, so things have died at the switch level and then some...
// Just close the connection at this point...
select {
case ret <- errors.New("timeout"):
default:
}
intf.msgIO.close()
}
}
}()
@ -309,7 +327,13 @@ func (intf *linkInterface) handler() error {
intf.peer.handlePacket(msg)
}
if err != nil {
return err
if err != io.EOF {
select {
case ret <- err:
default:
}
}
break
}
select {
case signalAlive <- len(msg) > 0:
@ -317,5 +341,15 @@ func (intf *linkInterface) handler() error {
}
}
////////////////////////////////////////////////////////////////////////////////
return nil
// Remember to set `err` to something useful before returning
select {
case err = <-ret:
intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local, err)
default:
err = nil
intf.link.core.log.Infof("Disconnected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
}
return err
}

View File

@ -66,19 +66,36 @@ func (r *router) init(core *Core) {
r.reconfigure = make(chan chan error, 1)
r.addr = *address.AddrForNodeID(&r.core.dht.nodeID)
r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID)
in := make(chan []byte, 32) // TODO something better than this...
in := make(chan []byte, 1) // TODO something better than this...
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)", nil)
p.out = func(packet []byte) {
// This is to make very sure it never blocks
select {
case in <- packet:
return
default:
util.PutBytes(packet)
}
}
p.out = func(packet []byte) { in <- packet }
r.in = in
r.out = func(packet []byte) { p.handlePacket(packet) } // The caller is responsible for go-ing if it needs to not block
out := make(chan []byte, 32)
go func() {
for packet := range out {
p.handlePacket(packet)
}
}()
out2 := make(chan []byte, 32)
go func() {
// This worker makes sure r.out never blocks
// It will buffer traffic long enough for the switch worker to take it
// If (somehow) you can send faster than the switch can receive, then this would use unbounded memory
// But crypto slows sends enough that the switch should always be able to take the packets...
var buf [][]byte
for {
buf = append(buf, <-out2)
for len(buf) > 0 {
select {
case bs := <-out2:
buf = append(buf, bs)
case out <- buf[0]:
buf = buf[1:]
}
}
}
}()
r.out = func(packet []byte) { out2 <- packet }
r.toRecv = make(chan router_recvPacket, 32)
recv := make(chan []byte, 32)
send := make(chan []byte, 32)
@ -305,7 +322,6 @@ func (r *router) sendPacket(bs []byte) {
// Don't continue - drop the packet
return
}
sinfo.send <- bs
}
}

View File

@ -525,17 +525,36 @@ func (ss *sessions) resetInits() {
// It handles calling the relatively expensive crypto operations.
// It's also responsible for checking nonces and dropping out-of-date/duplicate packets, or else calling the function to update nonces if the packet is OK.
func (sinfo *sessionInfo) doWorker() {
send := make(chan []byte, 32)
defer close(send)
go func() {
for bs := range send {
sinfo.doSend(bs)
}
}()
recv := make(chan *wire_trafficPacket, 32)
defer close(recv)
go func() {
for p := range recv {
sinfo.doRecv(p)
}
}()
for {
select {
case p, ok := <-sinfo.recv:
if ok {
sinfo.doRecv(p)
select {
case recv <- p:
default:
// We need something to not block, and it's best to drop it before we decrypt
util.PutBytes(p.Payload)
}
} else {
return
}
case bs, ok := <-sinfo.send:
if ok {
sinfo.doSend(bs)
send <- bs
} else {
return
}
@ -625,8 +644,5 @@ func (sinfo *sessionInfo) doRecv(p *wire_trafficPacket) {
sinfo.updateNonce(&p.Nonce)
sinfo.time = time.Now()
sinfo.bytesRecvd += uint64(len(bs))
select {
case sinfo.core.router.toRecv <- router_recvPacket{bs, sinfo}:
default: // avoid deadlocks, maybe do this somewhere else?...
}
sinfo.core.router.toRecv <- router_recvPacket{bs, sinfo}
}

View File

@ -176,6 +176,7 @@ type switchTable struct {
admin chan func() // Pass a lambda for the admin socket to query stuff
queues switch_buffers // Queues - not atomic so ONLY use through admin chan
queueTotalMaxSize uint64 // Maximum combined size of queues
toRouter chan []byte // Packets to be sent to the router
}
// Minimum allowed total size of switch queues.
@ -199,6 +200,7 @@ func (t *switchTable) init(core *Core) {
t.idleIn = make(chan switchPort, 1024)
t.admin = make(chan func())
t.queueTotalMaxSize = SwitchQueueTotalMinSize
t.toRouter = make(chan []byte, 1)
}
// Safely gets a copy of this node's locator.
@ -215,7 +217,6 @@ func (t *switchTable) doMaintenance() {
defer t.mutex.Unlock() // Release lock when we're done
t.cleanRoot()
t.cleanDropped()
t.cleanPeers()
}
// Updates the root periodically if it is ourself, or promotes ourself to root if we're better than the current root or if the current root has timed out.
@ -272,28 +273,6 @@ func (t *switchTable) forgetPeer(port switchPort) {
}
}
// Clean all unresponsive peers from the table, needed in case a peer stops updating.
// Needed in case a non-parent peer keeps the connection open but stops sending updates.
// Also reclaims space from deleted peers by copying the map.
func (t *switchTable) cleanPeers() {
now := time.Now()
for port, peer := range t.data.peers {
if now.Sub(peer.time) > switch_timeout+switch_throttle {
// Longer than switch_timeout to make sure we don't remove a working peer because the root stopped responding.
delete(t.data.peers, port)
go t.core.peers.removePeer(port) // TODO figure out if it's safe to do this without a goroutine, or make it safe
}
}
if _, isIn := t.data.peers[t.parent]; !isIn {
// The root timestamp would probably time out before this happens, but better safe than sorry.
// We removed the current parent, so find a new one.
t.parent = 0
for _, peer := range t.data.peers {
t.unlockedHandleMsg(&peer.msg, peer.port, true)
}
}
}
// Dropped is a list of roots that are better than the current root, but stopped sending new timestamps.
// If we switch to a new root, and that root is better than an old root that previously timed out, then we can clean up the old dropped root infos.
// This function is called periodically to do that cleanup.
@ -570,23 +549,23 @@ func (t *switchTable) start() error {
return nil
}
// Check if a packet should go to the self node
// This means there's no node closer to the destination than us
// This is mainly used to identify packets addressed to us, or that hit a blackhole
func (t *switchTable) selfIsClosest(dest []byte) bool {
// Return a map of ports onto distance, keeping only ports closer to the destination than this node
// If the map is empty (or nil), then no peer is closer
func (t *switchTable) getCloser(dest []byte) map[switchPort]int {
table := t.getTable()
myDist := table.self.dist(dest)
if myDist == 0 {
// Skip the iteration step if it's impossible to be closer
return true
return nil
}
closer := make(map[switchPort]int, len(table.elems))
for _, info := range table.elems {
dist := info.locator.dist(dest)
if dist < myDist {
return false
closer[info.port] = dist
}
}
return true
return closer
}
// Returns true if the peer is closer to the destination than ourself
@ -639,26 +618,48 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort {
// Returns true if the packet has been handled somehow, false if it should be queued
func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool {
coords := switch_getPacketCoords(packet)
ports := t.core.peers.getPorts()
if t.selfIsClosest(coords) {
closer := t.getCloser(coords)
if len(closer) == 0 {
// TODO? call the router directly, and remove the whole concept of a self peer?
ports[0].sendPacket(packet)
t.toRouter <- packet
return true
}
table := t.getTable()
myDist := table.self.dist(coords)
var best *peer
bestDist := myDist
for port := range idle {
if to := ports[port]; to != nil {
if info, isIn := table.elems[to.port]; isIn {
dist := info.locator.dist(coords)
if !(dist < bestDist) {
continue
}
best = to
bestDist = dist
}
var bestDist int
var bestCoordLen int
ports := t.core.peers.getPorts()
for port, dist := range closer {
to := ports[port]
_, isIdle := idle[port]
coordLen := len(table.elems[port].locator.coords)
var update bool
switch {
case to == nil:
//nothing
case !isIdle:
//nothing
case best == nil:
update = true
case dist < bestDist:
update = true
case dist > bestDist:
//nothing
case coordLen < bestCoordLen:
update = true
/*
case coordLen > bestCoordLen:
//nothing
case port < best.port:
update = true
*/
default:
//nothing
}
if update {
best = to
bestDist = dist
bestCoordLen = coordLen
}
}
if best != nil {
@ -697,7 +698,7 @@ func (b *switch_buffers) cleanup(t *switchTable) {
// Remove queues for which we have no next hop
packet := buf.packets[0]
coords := switch_getPacketCoords(packet.bytes)
if t.selfIsClosest(coords) {
if len(t.getCloser(coords)) == 0 {
for _, packet := range buf.packets {
util.PutBytes(packet.bytes)
}
@ -776,11 +777,38 @@ func (t *switchTable) handleIdle(port switchPort) bool {
// The switch worker does routing lookups and sends packets to where they need to be
func (t *switchTable) doWorker() {
sendingToRouter := make(chan []byte, 1)
go func() {
// Keep sending packets to the router
self := t.core.peers.getPorts()[0]
for bs := range sendingToRouter {
self.sendPacket(bs)
}
}()
go func() {
// Keep taking packets from the idle worker and sending them to the above whenever it's idle, keeping anything extra in a (fifo, head-drop) buffer
var buf [][]byte
for {
buf = append(buf, <-t.toRouter)
for len(buf) > 0 {
select {
case bs := <-t.toRouter:
buf = append(buf, bs)
for len(buf) > 32 {
util.PutBytes(buf[0])
buf = buf[1:]
}
case sendingToRouter <- buf[0]:
buf = buf[1:]
}
}
}
}()
t.queues.switchTable = t
t.queues.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string)
idle := make(map[switchPort]struct{}) // this is to deduplicate things
for {
t.core.log.Debugf("Switch state: idle = %d, buffers = %d", len(idle), len(t.queues.bufs))
//t.core.log.Debugf("Switch state: idle = %d, buffers = %d", len(idle), len(t.queues.bufs))
select {
case bytes := <-t.packetIn:
// Try to send it somewhere (or drop it if it's corrupt or at a dead end)