2019-04-18 15:38:24 +00:00
package yggdrasil
import (
"errors"
2019-04-21 11:28:46 +00:00
"fmt"
2019-04-20 10:53:38 +00:00
"sync"
2019-04-22 10:20:35 +00:00
"sync/atomic"
2019-04-18 15:38:24 +00:00
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
2019-04-18 22:38:23 +00:00
"github.com/yggdrasil-network/yggdrasil-go/src/util"
2019-04-18 15:38:24 +00:00
)
2019-05-29 19:16:17 +00:00
// ConnError implements the net.Error interface
2019-04-27 00:31:47 +00:00
type ConnError struct {
error
timeout bool
temporary bool
2019-07-17 10:13:53 +00:00
closed bool
2019-05-29 19:16:17 +00:00
maxsize int
2019-04-27 00:31:47 +00:00
}
2019-05-29 19:16:17 +00:00
// Timeout returns true if the error relates to a timeout condition on the
// connection.
2019-04-27 00:31:47 +00:00
func ( e * ConnError ) Timeout ( ) bool {
return e . timeout
}
2019-05-29 19:16:17 +00:00
// Temporary return true if the error is temporary or false if it is a permanent
// error condition.
2019-04-27 00:31:47 +00:00
func ( e * ConnError ) Temporary ( ) bool {
return e . temporary
}
2019-05-29 19:16:17 +00:00
// PacketTooBig returns in response to sending a packet that is too large, and
// if so, the maximum supported packet size that should be used for the
// connection.
func ( e * ConnError ) PacketTooBig ( ) ( bool , int ) {
return e . maxsize > 0 , e . maxsize
}
2019-07-17 10:13:53 +00:00
// Closed returns if the session is already closed and is now unusable.
func ( e * ConnError ) Closed ( ) bool {
return e . closed
}
2019-04-18 22:38:23 +00:00
type Conn struct {
core * Core
nodeID * crypto . NodeID
nodeMask * crypto . NodeID
2019-04-26 23:07:57 +00:00
mutex sync . RWMutex
2019-07-17 20:42:17 +00:00
close chan bool
2019-04-22 01:38:14 +00:00
session * sessionInfo
2019-06-28 23:42:31 +00:00
readDeadline atomic . Value // time.Time // TODO timer
writeDeadline atomic . Value // time.Time // TODO timer
2019-04-26 23:07:57 +00:00
}
2019-04-27 00:31:47 +00:00
// TODO func NewConn() that initializes additional fields as needed
2019-04-26 23:07:57 +00:00
func newConn ( core * Core , nodeID * crypto . NodeID , nodeMask * crypto . NodeID , session * sessionInfo ) * Conn {
conn := Conn {
2019-06-28 23:42:31 +00:00
core : core ,
nodeID : nodeID ,
nodeMask : nodeMask ,
session : session ,
2019-07-17 20:42:17 +00:00
close : make ( chan bool ) ,
2019-04-26 23:07:57 +00:00
}
return & conn
2019-04-18 22:38:23 +00:00
}
2019-04-21 11:28:46 +00:00
func ( c * Conn ) String ( ) string {
2019-04-22 14:00:19 +00:00
return fmt . Sprintf ( "conn=%p" , c )
2019-04-21 11:28:46 +00:00
}
2019-06-29 17:14:44 +00:00
// This should never be called from the router goroutine
2019-06-28 23:42:31 +00:00
func ( c * Conn ) search ( ) error {
2019-06-29 17:14:44 +00:00
var sinfo * searchInfo
var isIn bool
c . core . router . doAdmin ( func ( ) { sinfo , isIn = c . core . searches . searches [ * c . nodeID ] } )
2019-06-28 23:42:31 +00:00
if ! isIn {
done := make ( chan struct { } , 1 )
var sess * sessionInfo
var err error
searchCompleted := func ( sinfo * sessionInfo , e error ) {
sess = sinfo
err = e
// FIXME close can be called multiple times, do a non-blocking send instead
select {
case done <- struct { } { } :
default :
2019-04-22 19:06:39 +00:00
}
2019-04-18 22:38:23 +00:00
}
2019-06-29 17:14:44 +00:00
c . core . router . doAdmin ( func ( ) {
sinfo = c . core . searches . newIterSearch ( c . nodeID , c . nodeMask , searchCompleted )
sinfo . continueSearch ( )
} )
2019-06-28 23:42:31 +00:00
<- done
c . session = sess
if c . session == nil && err == nil {
2019-06-29 21:10:02 +00:00
panic ( "search failed but returned no error" )
2019-04-27 00:31:47 +00:00
}
2019-06-29 21:10:02 +00:00
if c . session != nil {
c . nodeID = crypto . GetNodeID ( & c . session . theirPermPub )
for i := range c . nodeMask {
c . nodeMask [ i ] = 0xFF
}
2019-04-18 15:38:24 +00:00
}
2019-06-28 23:42:31 +00:00
return err
2019-04-22 01:38:14 +00:00
} else {
2019-06-28 23:42:31 +00:00
return errors . New ( "search already exists" )
2019-04-18 15:38:24 +00:00
}
2019-06-28 23:42:31 +00:00
return nil
2019-04-18 15:38:24 +00:00
}
2019-04-27 00:31:47 +00:00
func getDeadlineTimer ( value * atomic . Value ) * time . Timer {
2019-04-27 03:42:05 +00:00
timer := time . NewTimer ( 24 * 365 * time . Hour ) // FIXME for some reason setting this to 0 doesn't always let it stop and drain the channel correctly
2019-04-27 00:31:47 +00:00
util . TimerStop ( timer )
if deadline , ok := value . Load ( ) . ( time . Time ) ; ok {
timer . Reset ( time . Until ( deadline ) )
}
return timer
}
2019-04-18 15:38:24 +00:00
func ( c * Conn ) Read ( b [ ] byte ) ( int , error ) {
2019-04-22 14:00:19 +00:00
// Take a copy of the session object
2019-04-22 10:20:35 +00:00
sinfo := c . session
2019-04-27 00:31:47 +00:00
timer := getDeadlineTimer ( & c . readDeadline )
defer util . TimerStop ( timer )
2019-05-31 22:51:01 +00:00
for {
// Wait for some traffic to come through from the session
select {
2019-07-17 20:42:17 +00:00
case <- c . close :
return 0 , ConnError { errors . New ( "session closed" ) , false , false , true , 0 }
2019-05-31 22:51:01 +00:00
case <- timer . C :
2019-07-17 20:42:17 +00:00
return 0 , ConnError { errors . New ( "read timeout" ) , true , false , false , 0 }
2019-05-31 22:51:01 +00:00
case p , ok := <- sinfo . recv :
// If the session is closed then do nothing
if ! ok {
2019-07-17 20:42:17 +00:00
return 0 , ConnError { errors . New ( "session closed" ) , false , false , true , 0 }
2019-05-31 22:51:01 +00:00
}
defer util . PutBytes ( p . Payload )
var err error
done := make ( chan struct { } )
workerFunc := func ( ) {
defer close ( done )
// If the nonce is bad then drop the packet and return an error
if ! sinfo . nonceIsOK ( & p . Nonce ) {
2019-07-17 10:13:53 +00:00
err = ConnError { errors . New ( "packet dropped due to invalid nonce" ) , false , true , false , 0 }
2019-05-31 22:51:01 +00:00
return
}
// Decrypt the packet
bs , isOK := crypto . BoxOpen ( & sinfo . sharedSesKey , p . Payload , & p . Nonce )
defer util . PutBytes ( bs ) // FIXME commenting this out leads to illegal buffer reuse, this implies there's a memory error somewhere and that this is just flooding things out of the finite pool of old slices that get reused
// Check if we were unable to decrypt the packet for some reason and
// return an error if we couldn't
if ! isOK {
2019-07-17 10:13:53 +00:00
err = ConnError { errors . New ( "packet dropped due to decryption failure" ) , false , true , false , 0 }
2019-05-31 22:51:01 +00:00
return
}
// Return the newly decrypted buffer back to the slice we were given
copy ( b , bs )
// Trim the slice down to size based on the data we received
if len ( bs ) < len ( b ) {
b = b [ : len ( bs ) ]
}
// Update the session
sinfo . updateNonce ( & p . Nonce )
sinfo . time = time . Now ( )
sinfo . bytesRecvd += uint64 ( len ( b ) )
2019-04-19 20:23:15 +00:00
}
2019-05-31 22:51:01 +00:00
// Hand over to the session worker
2019-07-01 23:55:07 +00:00
defer func ( ) {
if recover ( ) != nil {
2019-07-17 20:42:17 +00:00
err = ConnError { errors . New ( "read failed, session already closed" ) , false , false , true , 0 }
2019-07-01 23:55:07 +00:00
close ( done )
}
} ( ) // In case we're racing with a close
2019-07-17 20:42:17 +00:00
// Send to worker
select {
2019-05-31 22:51:01 +00:00
case sinfo . worker <- workerFunc :
2019-07-17 20:42:17 +00:00
case <- c . close :
return 0 , ConnError { errors . New ( "session closed" ) , false , false , true , 0 }
2019-05-31 22:51:01 +00:00
case <- timer . C :
2019-07-17 20:42:17 +00:00
return 0 , ConnError { errors . New ( "read timeout" ) , true , false , false , 0 }
}
// Wait for the worker to finish
select {
case <- done : // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff)
case <- c . close :
return 0 , ConnError { errors . New ( "session closed" ) , false , false , true , 0 }
case <- timer . C :
return 0 , ConnError { errors . New ( "read timeout" ) , true , false , false , 0 }
2019-04-19 20:23:15 +00:00
}
2019-05-31 22:51:01 +00:00
// Something went wrong in the session worker so abort
if err != nil {
if ce , ok := err . ( * ConnError ) ; ok && ce . Temporary ( ) {
continue
}
return 0 , err
2019-04-19 22:47:11 +00:00
}
2019-05-31 22:51:01 +00:00
// If we've reached this point then everything went to plan, return the
// number of bytes we populated back into the given slice
return len ( b ) , nil
2019-04-19 09:55:15 +00:00
}
2019-04-18 22:38:23 +00:00
}
2019-04-18 15:38:24 +00:00
}
2019-04-19 09:55:15 +00:00
func ( c * Conn ) Write ( b [ ] byte ) ( bytesWritten int , err error ) {
2019-04-22 10:20:35 +00:00
sinfo := c . session
2019-04-22 01:38:14 +00:00
var packet [ ] byte
2019-04-27 00:31:47 +00:00
done := make ( chan struct { } )
2019-05-29 19:16:17 +00:00
written := len ( b )
2019-04-27 00:31:47 +00:00
workerFunc := func ( ) {
defer close ( done )
2019-05-29 19:16:17 +00:00
// Does the packet exceed the permitted size for the session?
if uint16 ( len ( b ) ) > sinfo . getMTU ( ) {
2019-07-17 10:13:53 +00:00
written , err = 0 , ConnError { errors . New ( "packet too big" ) , true , false , false , int ( sinfo . getMTU ( ) ) }
2019-05-29 19:16:17 +00:00
return
}
2019-04-22 14:00:19 +00:00
// Encrypt the packet
2019-04-22 01:38:14 +00:00
payload , nonce := crypto . BoxSeal ( & sinfo . sharedSesKey , b , & sinfo . myNonce )
defer util . PutBytes ( payload )
2019-04-22 14:00:19 +00:00
// Construct the wire packet to send to the router
2019-04-22 01:38:14 +00:00
p := wire_trafficPacket {
Coords : sinfo . coords ,
Handle : sinfo . theirHandle ,
Nonce : * nonce ,
Payload : payload ,
}
packet = p . encode ( )
sinfo . bytesSent += uint64 ( len ( b ) )
2019-06-28 23:42:31 +00:00
// The rest of this work is session keep-alive traffic
doSearch := func ( ) {
routerWork := func ( ) {
// Check to see if there is a search already matching the destination
sinfo , isIn := c . core . searches . searches [ * c . nodeID ]
if ! isIn {
// Nothing was found, so create a new search
searchCompleted := func ( sinfo * sessionInfo , e error ) { }
sinfo = c . core . searches . newIterSearch ( c . nodeID , c . nodeMask , searchCompleted )
c . core . log . Debugf ( "%s DHT search started: %p" , c . String ( ) , sinfo )
}
// Continue the search
sinfo . continueSearch ( )
}
go func ( ) { c . core . router . admin <- routerWork } ( )
}
switch {
case time . Since ( sinfo . time ) > 6 * time . Second :
if sinfo . time . Before ( sinfo . pingTime ) && time . Since ( sinfo . pingTime ) > 6 * time . Second {
// TODO double check that the above condition is correct
doSearch ( )
} else {
sinfo . core . sessions . ping ( sinfo )
}
2019-06-29 21:10:02 +00:00
case sinfo . reset && sinfo . pingTime . Before ( sinfo . time ) :
sinfo . core . sessions . ping ( sinfo )
2019-06-28 23:42:31 +00:00
default : // Don't do anything, to keep traffic throttled
}
2019-04-27 00:31:47 +00:00
}
2019-05-24 01:27:52 +00:00
// Set up a timer so this doesn't block forever
timer := getDeadlineTimer ( & c . writeDeadline )
defer util . TimerStop ( timer )
2019-04-27 00:31:47 +00:00
// Hand over to the session worker
2019-06-29 23:50:21 +00:00
defer func ( ) {
if recover ( ) != nil {
2019-07-17 10:13:53 +00:00
err = ConnError { errors . New ( "write failed, session already closed" ) , false , false , true , 0 }
2019-07-01 23:55:07 +00:00
close ( done )
2019-06-29 23:50:21 +00:00
}
} ( ) // In case we're racing with a close
2019-04-27 00:31:47 +00:00
select { // Send to worker
case sinfo . worker <- workerFunc :
case <- timer . C :
2019-07-17 20:42:17 +00:00
return 0 , ConnError { errors . New ( "write timeout" ) , true , false , false , 0 }
2019-04-27 00:31:47 +00:00
}
2019-05-24 01:27:52 +00:00
// Wait for the worker to finish, otherwise there are memory errors ([Get||Put]Bytes stuff)
<- done
2019-04-22 14:00:19 +00:00
// Give the packet to the router
2019-05-29 19:16:17 +00:00
if written > 0 {
sinfo . core . router . out ( packet )
}
2019-04-22 14:00:19 +00:00
// Finally return the number of bytes we wrote
2019-05-29 19:16:17 +00:00
return written , err
2019-04-18 15:38:24 +00:00
}
2019-07-17 20:42:17 +00:00
func ( c * Conn ) Close ( ) ( err error ) {
2019-04-27 00:31:47 +00:00
c . mutex . Lock ( )
defer c . mutex . Unlock ( )
if c . session != nil {
// Close the session, if it hasn't been closed already
2019-06-29 22:44:28 +00:00
c . core . router . doAdmin ( c . session . close )
2019-04-27 00:31:47 +00:00
}
2019-07-17 20:42:17 +00:00
func ( ) {
defer func ( ) {
recover ( )
err = ConnError { errors . New ( "close failed, session already closed" ) , false , false , true , 0 }
} ( )
close ( c . close ) // Closes reader/writer goroutines
} ( )
return
2019-04-18 15:38:24 +00:00
}
func ( c * Conn ) LocalAddr ( ) crypto . NodeID {
return * crypto . GetNodeID ( & c . session . core . boxPub )
}
func ( c * Conn ) RemoteAddr ( ) crypto . NodeID {
2019-04-20 15:32:27 +00:00
c . mutex . RLock ( )
defer c . mutex . RUnlock ( )
return * c . nodeID
2019-04-18 15:38:24 +00:00
}
func ( c * Conn ) SetDeadline ( t time . Time ) error {
c . SetReadDeadline ( t )
c . SetWriteDeadline ( t )
return nil
}
func ( c * Conn ) SetReadDeadline ( t time . Time ) error {
2019-04-22 10:20:35 +00:00
c . readDeadline . Store ( t )
2019-04-18 15:38:24 +00:00
return nil
}
func ( c * Conn ) SetWriteDeadline ( t time . Time ) error {
2019-04-22 10:20:35 +00:00
c . writeDeadline . Store ( t )
2019-04-18 15:38:24 +00:00
return nil
}