5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-29 22:31:37 +00:00

Conn Read/Write operations will block while search completes

This commit is contained in:
Neil Alexander 2019-04-22 22:38:37 +01:00
parent ea8948f378
commit 6e528799e9
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
2 changed files with 20 additions and 5 deletions

View File

@ -216,7 +216,7 @@ func (tun *TunAdapter) connReader(conn *yggdrasil.Conn) error {
for { for {
n, err := conn.Read(b) n, err := conn.Read(b)
if err != nil { if err != nil {
//tun.log.Errorln(conn.String(), "TUN/TAP conn read error:", err) tun.log.Errorln(conn.String(), "TUN/TAP conn read error:", err)
continue continue
} }
if n == 0 { if n == 0 {

View File

@ -21,6 +21,7 @@ type Conn struct {
readDeadline atomic.Value // time.Time // TODO timer readDeadline atomic.Value // time.Time // TODO timer
writeDeadline atomic.Value // time.Time // TODO timer writeDeadline atomic.Value // time.Time // TODO timer
searching atomic.Value // bool searching atomic.Value // bool
searchwait chan interface{}
} }
func (c *Conn) String() string { func (c *Conn) String() string {
@ -31,6 +32,8 @@ func (c *Conn) String() string {
func (c *Conn) startSearch() { func (c *Conn) startSearch() {
// The searchCompleted callback is given to the search // The searchCompleted callback is given to the search
searchCompleted := func(sinfo *sessionInfo, err error) { searchCompleted := func(sinfo *sessionInfo, err error) {
// Make sure that any blocks on read/write operations are lifted
defer close(c.searchwait)
// Update the connection with the fact that the search completed, which // Update the connection with the fact that the search completed, which
// allows another search to be triggered if necessary // allows another search to be triggered if necessary
c.searching.Store(false) c.searching.Store(false)
@ -70,6 +73,8 @@ func (c *Conn) startSearch() {
// Nothing was found, so create a new search // Nothing was found, so create a new search
sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo) c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo)
// Allow writes/reads to block until the search is complete
c.searchwait = make(chan interface{})
} }
// Continue the search // Continue the search
c.core.searches.continueSearch(sinfo) c.core.searches.continueSearch(sinfo)
@ -110,12 +115,16 @@ func (c *Conn) Read(b []byte) (int, error) {
// us to block forever here if the session will not reopen. // us to block forever here if the session will not reopen.
// TODO: should this return an error or just a zero-length buffer? // TODO: should this return an error or just a zero-length buffer?
if sinfo == nil || !sinfo.init { if sinfo == nil || !sinfo.init {
return 0, errors.New("session is closed") // block
<-c.searchwait
// return 0, errors.New("session is closed")
} }
// Wait for some traffic to come through from the session // Wait for some traffic to come through from the session
fmt.Println("Start select")
select { select {
// TODO... // TODO...
case p, ok := <-c.recv: case p, ok := <-c.recv:
fmt.Println("Finish select")
// If the session is closed then do nothing // If the session is closed then do nothing
if !ok { if !ok {
return 0, errors.New("session is closed") return 0, errors.New("session is closed")
@ -167,6 +176,9 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
c.mutex.RLock() c.mutex.RLock()
sinfo := c.session sinfo := c.session
c.mutex.RUnlock() c.mutex.RUnlock()
// A search is already taking place so wait for it to finish
if sinfo == nil || !sinfo.init {
}
// If the session doesn't exist, or isn't initialised (which probably means // If the session doesn't exist, or isn't initialised (which probably means
// that the search didn't complete successfully) then try to search again // that the search didn't complete successfully) then try to search again
if sinfo == nil || !sinfo.init { if sinfo == nil || !sinfo.init {
@ -176,10 +188,13 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
c.core.router.doAdmin(func() { c.core.router.doAdmin(func() {
c.startSearch() c.startSearch()
}) })
return 0, errors.New("starting search") //return 0, errors.New("starting search")
} }
// A search is already taking place so wait for it to finish <-c.searchwait
return 0, errors.New("waiting for search to complete") if sinfo == nil || !sinfo.init {
return 0, errors.New("search was failed")
}
//return 0, errors.New("waiting for search to complete")
} }
// defer util.PutBytes(b) // defer util.PutBytes(b)
var packet []byte var packet []byte