5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-22 22:20:27 +00:00

Fix deadlocks

This commit is contained in:
Neil Alexander 2019-10-24 09:54:57 +01:00
parent 7341fcb9bc
commit 5ca81f916e
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"net" "net"
"regexp" "regexp"
"sync/atomic"
"time" "time"
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
@ -28,7 +29,7 @@ type Multicast struct {
groupAddr string groupAddr string
listeners map[string]*listenerInfo listeners map[string]*listenerInfo
listenPort uint16 listenPort uint16
isOpen bool isOpen atomic.Value // bool
announcer *time.Timer announcer *time.Timer
platformhandler *time.Timer platformhandler *time.Timer
} }
@ -48,6 +49,7 @@ func (m *Multicast) Init(core *yggdrasil.Core, state *config.NodeState, log *log
current := m.config.GetCurrent() current := m.config.GetCurrent()
m.listenPort = current.LinkLocalTCPPort m.listenPort = current.LinkLocalTCPPort
m.groupAddr = "[ff02::114]:9001" m.groupAddr = "[ff02::114]:9001"
m.isOpen.Store(false)
return nil return nil
} }
@ -59,15 +61,16 @@ func (m *Multicast) Start() error {
phony.Block(m, func() { phony.Block(m, func() {
err = m._start() err = m._start()
}) })
m.log.Debugln("Started multicast module")
return err return err
} }
func (m *Multicast) _start() error { func (m *Multicast) _start() error {
if m.isOpen { if m.IsStarted() {
return fmt.Errorf("multicast module is already started") return fmt.Errorf("multicast module is already started")
} }
if len(m.config.GetCurrent().MulticastInterfaces) == 0 { if len(m.config.GetCurrent().MulticastInterfaces) == 0 {
return fmt.Errorf("no MulticastInterfaces configured") return nil
} }
m.log.Infoln("Starting multicast module") m.log.Infoln("Starting multicast module")
addr, err := net.ResolveUDPAddr("udp", m.groupAddr) addr, err := net.ResolveUDPAddr("udp", m.groupAddr)
@ -87,7 +90,7 @@ func (m *Multicast) _start() error {
// Windows can't set this flag, so we need to handle it in other ways // Windows can't set this flag, so we need to handle it in other ways
} }
m.isOpen = true m.isOpen.Store(true)
go m.listen() go m.listen()
m.Act(m, m.multicastStarted) m.Act(m, m.multicastStarted)
m.Act(m, m.announce) m.Act(m, m.announce)
@ -97,21 +100,25 @@ func (m *Multicast) _start() error {
// IsStarted returns true if the module has been started. // IsStarted returns true if the module has been started.
func (m *Multicast) IsStarted() bool { func (m *Multicast) IsStarted() bool {
var isOpen bool if m.isOpen.Load() == nil {
phony.Block(m, func() { return false
isOpen = m.isOpen }
}) return m.isOpen.Load().(bool)
return isOpen
} }
// Stop stops the multicast module. // Stop stops the multicast module.
func (m *Multicast) Stop() { func (m *Multicast) Stop() error {
m.Act(m, m._stop) var err error
phony.Block(m, func() {
err = m._stop()
})
m.log.Debugln("Stopped multicast module")
return nil
} }
func (m *Multicast) _stop() { func (m *Multicast) _stop() error {
m.log.Infoln("Stopping multicast module") m.log.Infoln("Stopping multicast module")
m.isOpen = false m.isOpen.Store(false)
if m.announcer != nil { if m.announcer != nil {
m.announcer.Stop() m.announcer.Stop()
} }
@ -119,6 +126,7 @@ func (m *Multicast) _stop() {
m.platformhandler.Stop() m.platformhandler.Stop()
} }
m.sock.Close() m.sock.Close()
return nil
} }
// UpdateConfig updates the multicast module with the provided config.NodeConfig // UpdateConfig updates the multicast module with the provided config.NodeConfig
@ -129,18 +137,23 @@ func (m *Multicast) UpdateConfig(config *config.NodeConfig) {
} }
func (m *Multicast) _updateConfig(config *config.NodeConfig) { func (m *Multicast) _updateConfig(config *config.NodeConfig) {
m.log.Debugln("Reloading multicast configuration...") m.log.Infoln("Reloading multicast configuration...")
if m.IsStarted() { if m.IsStarted() {
if len(config.MulticastInterfaces) == 0 || config.LinkLocalTCPPort != m.listenPort { if len(config.MulticastInterfaces) == 0 || config.LinkLocalTCPPort != m.listenPort {
m.Stop() if err := m._stop(); err != nil {
m.log.Errorln("Error stopping multicast module:", err)
}
} }
} }
m.config.Replace(*config) m.config.Replace(*config)
m.listenPort = config.LinkLocalTCPPort m.listenPort = config.LinkLocalTCPPort
if !m.IsStarted() && len(config.MulticastInterfaces) > 0 { if !m.IsStarted() && len(config.MulticastInterfaces) > 0 {
m.Start() if err := m._start(); err != nil {
m.log.Errorln("Error starting multicast module:", err)
} }
} }
m.log.Debugln("Reloaded multicast configuration successfully")
}
// GetInterfaces returns the currently known/enabled multicast interfaces. It is // GetInterfaces returns the currently known/enabled multicast interfaces. It is
// expected that UpdateInterfaces has been called at least once before calling // expected that UpdateInterfaces has been called at least once before calling
@ -312,7 +325,7 @@ func (m *Multicast) listen() {
for { for {
nBytes, rcm, fromAddr, err := m.sock.ReadFrom(bs) nBytes, rcm, fromAddr, err := m.sock.ReadFrom(bs)
if err != nil { if err != nil {
if !m.isOpen { if !m.IsStarted() {
return return
} }
panic(err) panic(err)