5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-12-23 19:15:40 +00:00

Merge pull request #441 from Arceliar/dial

Dial
This commit is contained in:
Neil Alexander 2019-06-29 10:52:05 +01:00 committed by GitHub
commit 0d23342358
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 171 additions and 282 deletions

View File

@ -134,7 +134,7 @@ func (tun *TunAdapter) reader() error {
} }
// Then offset the buffer so that we can now just treat it as an IP // Then offset the buffer so that we can now just treat it as an IP
// packet from now on // packet from now on
bs = bs[offset:] bs = bs[offset:] // FIXME this breaks bs for the next read and means n is the wrong value
} }
// From the IP header, work out what our source and destination addresses // From the IP header, work out what our source and destination addresses
// and node IDs are. We will need these in order to work out where to send // and node IDs are. We will need these in order to work out where to send
@ -225,21 +225,46 @@ func (tun *TunAdapter) reader() error {
panic("Given empty dstNodeID and dstNodeIDMask - this shouldn't happen") panic("Given empty dstNodeID and dstNodeIDMask - this shouldn't happen")
} }
// Dial to the remote node // Dial to the remote node
if conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask); err == nil { go func() {
// We've been given a connection so prepare the session wrapper // FIXME just spitting out a goroutine to do this is kind of ugly and means we drop packets until the dial finishes
if s, err := tun.wrap(conn); err != nil { tun.mutex.Lock()
// Something went wrong when storing the connection, typically that _, known := tun.dials[*dstNodeID]
// something already exists for this address or subnet packet := append(util.GetBytes(), bs[:n]...)
tun.log.Debugln("TUN/TAP iface wrap:", err) tun.dials[*dstNodeID] = append(tun.dials[*dstNodeID], packet)
} else { for len(tun.dials[*dstNodeID]) > 32 {
// Update our reference to the connection util.PutBytes(tun.dials[*dstNodeID][0])
session, isIn = s, true tun.dials[*dstNodeID] = tun.dials[*dstNodeID][1:]
} }
} else { tun.mutex.Unlock()
// We weren't able to dial for some reason so there's no point in if known {
// continuing this iteration - skip to the next one return
continue }
} var tc *tunConn
if conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask); err == nil {
// We've been given a connection so prepare the session wrapper
if tc, err = tun.wrap(conn); err != nil {
// Something went wrong when storing the connection, typically that
// something already exists for this address or subnet
tun.log.Debugln("TUN/TAP iface wrap:", err)
}
}
tun.mutex.Lock()
packets := tun.dials[*dstNodeID]
delete(tun.dials, *dstNodeID)
tun.mutex.Unlock()
if tc != nil {
for _, packet := range packets {
select {
case tc.send <- packet:
default:
util.PutBytes(packet)
}
}
}
}()
// While the dial is going on we can't do much else
// continuing this iteration - skip to the next one
continue
} }
// If we have a connection now, try writing to it // If we have a connection now, try writing to it
if isIn && session != nil { if isIn && session != nil {

View File

@ -49,6 +49,7 @@ type TunAdapter struct {
mutex sync.RWMutex // Protects the below mutex sync.RWMutex // Protects the below
addrToConn map[address.Address]*tunConn addrToConn map[address.Address]*tunConn
subnetToConn map[address.Subnet]*tunConn subnetToConn map[address.Subnet]*tunConn
dials map[crypto.NodeID][][]byte // Buffer of packets to send after dialing finishes
isOpen bool isOpen bool
} }
@ -113,6 +114,7 @@ func (tun *TunAdapter) Init(config *config.NodeState, log *log.Logger, listener
tun.dialer = dialer tun.dialer = dialer
tun.addrToConn = make(map[address.Address]*tunConn) tun.addrToConn = make(map[address.Address]*tunConn)
tun.subnetToConn = make(map[address.Subnet]*tunConn) tun.subnetToConn = make(map[address.Subnet]*tunConn)
tun.dials = make(map[crypto.NodeID][][]byte)
} }
// Start the setup process for the TUN/TAP adapter. If successful, starts the // Start the setup process for the TUN/TAP adapter. If successful, starts the
@ -235,6 +237,7 @@ func (tun *TunAdapter) wrap(conn *yggdrasil.Conn) (c *tunConn, err error) {
stop: make(chan struct{}), stop: make(chan struct{}),
alive: make(chan struct{}, 1), alive: make(chan struct{}, 1),
} }
c = &s
// Get the remote address and subnet of the other side // Get the remote address and subnet of the other side
remoteNodeID := conn.RemoteAddr() remoteNodeID := conn.RemoteAddr()
s.addr = *address.AddrForNodeID(&remoteNodeID) s.addr = *address.AddrForNodeID(&remoteNodeID)

View File

@ -517,21 +517,14 @@ func (c *Core) DHTPing(keyString, coordString, targetString string) (DHTRes, err
rq := dhtReqKey{info.key, target} rq := dhtReqKey{info.key, target}
sendPing := func() { sendPing := func() {
c.dht.addCallback(&rq, func(res *dhtRes) { c.dht.addCallback(&rq, func(res *dhtRes) {
defer func() { recover() }() resCh <- res
select {
case resCh <- res:
default:
}
}) })
c.dht.ping(&info, &target) c.dht.ping(&info, &target)
} }
c.router.doAdmin(sendPing) c.router.doAdmin(sendPing)
go func() {
time.Sleep(6 * time.Second)
close(resCh)
}()
// TODO: do something better than the below... // TODO: do something better than the below...
for res := range resCh { res := <-resCh
if res != nil {
r := DHTRes{ r := DHTRes{
Coords: append([]byte{}, res.Coords...), Coords: append([]byte{}, res.Coords...),
} }

View File

@ -45,23 +45,18 @@ type Conn struct {
mutex sync.RWMutex mutex sync.RWMutex
closed bool closed bool
session *sessionInfo session *sessionInfo
readDeadline atomic.Value // time.Time // TODO timer readDeadline atomic.Value // time.Time // TODO timer
writeDeadline atomic.Value // time.Time // TODO timer writeDeadline atomic.Value // time.Time // TODO timer
searching atomic.Value // bool
searchwait chan struct{} // Never reset this, it's only used for the initial search
writebuf [][]byte // Packets to be sent if/when the search finishes
} }
// TODO func NewConn() that initializes additional fields as needed // TODO func NewConn() that initializes additional fields as needed
func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session *sessionInfo) *Conn { func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session *sessionInfo) *Conn {
conn := Conn{ conn := Conn{
core: core, core: core,
nodeID: nodeID, nodeID: nodeID,
nodeMask: nodeMask, nodeMask: nodeMask,
session: session, session: session,
searchwait: make(chan struct{}),
} }
conn.searching.Store(false)
return &conn return &conn
} }
@ -69,91 +64,38 @@ func (c *Conn) String() string {
return fmt.Sprintf("conn=%p", c) return fmt.Sprintf("conn=%p", c)
} }
// This method should only be called from the router goroutine // This should only be called from the router goroutine
func (c *Conn) startSearch() { func (c *Conn) search() error {
// The searchCompleted callback is given to the search sinfo, isIn := c.core.searches.searches[*c.nodeID]
searchCompleted := func(sinfo *sessionInfo, err error) { if !isIn {
defer c.searching.Store(false) done := make(chan struct{}, 1)
// If the search failed for some reason, e.g. it hit a dead end or timed var sess *sessionInfo
// out, then do nothing var err error
if err != nil { searchCompleted := func(sinfo *sessionInfo, e error) {
c.core.log.Debugln(c.String(), "DHT search failed:", err) sess = sinfo
return err = e
} // FIXME close can be called multiple times, do a non-blocking send instead
// Take the connection mutex select {
c.mutex.Lock() case done <- struct{}{}:
defer c.mutex.Unlock() default:
// Were we successfully given a sessionInfo pointer?
if sinfo != nil {
// Store it, and update the nodeID and nodeMask (which may have been
// wildcarded before now) with their complete counterparts
c.core.log.Debugln(c.String(), "DHT search completed")
c.session = sinfo
c.nodeID = crypto.GetNodeID(&sinfo.theirPermPub)
for i := range c.nodeMask {
c.nodeMask[i] = 0xFF
} }
// Make sure that any blocks on read/write operations are lifted
defer func() { recover() }() // So duplicate searches don't panic
close(c.searchwait)
} else {
// No session was returned - this shouldn't really happen because we
// should always return an error reason if we don't return a session
panic("DHT search didn't return an error or a sessionInfo")
} }
if c.closed { sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
// Things were closed before the search returned sinfo.continueSearch()
// Go ahead and close it again to make sure the session is cleaned up <-done
go c.Close() c.session = sess
} else { if c.session == nil && err == nil {
// Send any messages we may have buffered panic("search failed but returend no error")
var msgs [][]byte
msgs, c.writebuf = c.writebuf, nil
go func() {
for _, msg := range msgs {
c.Write(msg)
util.PutBytes(msg)
}
}()
} }
} c.nodeID = crypto.GetNodeID(&c.session.theirPermPub)
// doSearch will be called below in response to one or more conditions for i := range c.nodeMask {
doSearch := func() { c.nodeMask[i] = 0xFF
c.searching.Store(true)
// Check to see if there is a search already matching the destination
sinfo, isIn := c.core.searches.searches[*c.nodeID]
if !isIn {
// Nothing was found, so create a new search
sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo)
} }
// Continue the search return err
c.core.searches.continueSearch(sinfo)
}
// Take a copy of the session object, in case it changes later
c.mutex.RLock()
sinfo := c.session
c.mutex.RUnlock()
if c.session == nil {
// No session object is present so previous searches, if we ran any, have
// not yielded a useful result (dead end, remote host not found)
doSearch()
} else { } else {
sinfo.worker <- func() { return errors.New("search already exists")
switch {
case !sinfo.init:
doSearch()
case time.Since(sinfo.time) > 6*time.Second:
if sinfo.time.Before(sinfo.pingTime) && time.Since(sinfo.pingTime) > 6*time.Second {
// TODO double check that the above condition is correct
doSearch()
} else {
c.core.sessions.ping(sinfo)
}
default: // Don't do anything, to keep traffic throttled
}
}
} }
return nil
} }
func getDeadlineTimer(value *atomic.Value) *time.Timer { func getDeadlineTimer(value *atomic.Value) *time.Timer {
@ -167,30 +109,9 @@ func getDeadlineTimer(value *atomic.Value) *time.Timer {
func (c *Conn) Read(b []byte) (int, error) { func (c *Conn) Read(b []byte) (int, error) {
// Take a copy of the session object // Take a copy of the session object
c.mutex.RLock()
sinfo := c.session sinfo := c.session
c.mutex.RUnlock()
timer := getDeadlineTimer(&c.readDeadline) timer := getDeadlineTimer(&c.readDeadline)
defer util.TimerStop(timer) defer util.TimerStop(timer)
// If there is a search in progress then wait for the result
if sinfo == nil {
// Wait for the search to complete
select {
case <-c.searchwait:
case <-timer.C:
return 0, ConnError{errors.New("timeout"), true, false, 0}
}
// Retrieve our session info again
c.mutex.RLock()
sinfo = c.session
c.mutex.RUnlock()
// If sinfo is still nil at this point then the search failed and the
// searchwait channel has been recreated, so might as well give up and
// return an error code
if sinfo == nil {
return 0, errors.New("search failed")
}
}
for { for {
// Wait for some traffic to come through from the session // Wait for some traffic to come through from the session
select { select {
@ -253,32 +174,7 @@ func (c *Conn) Read(b []byte) (int, error) {
} }
func (c *Conn) Write(b []byte) (bytesWritten int, err error) { func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
c.mutex.RLock()
sinfo := c.session sinfo := c.session
c.mutex.RUnlock()
// If the session doesn't exist, or isn't initialised (which probably means
// that the search didn't complete successfully) then we may need to wait for
// the search to complete or start the search again
if sinfo == nil || !sinfo.init {
// Is a search already taking place?
if searching, sok := c.searching.Load().(bool); !sok || (sok && !searching) {
// No search was already taking place so start a new one
c.core.router.doAdmin(c.startSearch)
}
// Buffer the packet to be sent if/when the search is finished
c.mutex.Lock()
defer c.mutex.Unlock()
c.writebuf = append(c.writebuf, append(util.GetBytes(), b...))
for len(c.writebuf) > 32 {
util.PutBytes(c.writebuf[0])
c.writebuf = c.writebuf[1:]
}
return len(b), nil
} else {
// This triggers some session keepalive traffic
// FIXME this desparately needs to be refactored, since the ping case needlessly goes through the router goroutine just to have it pass a function to the session worker when it determines that a session already exists.
c.core.router.doAdmin(c.startSearch)
}
var packet []byte var packet []byte
done := make(chan struct{}) done := make(chan struct{})
written := len(b) written := len(b)
@ -301,6 +197,34 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
} }
packet = p.encode() packet = p.encode()
sinfo.bytesSent += uint64(len(b)) sinfo.bytesSent += uint64(len(b))
// The rest of this work is session keep-alive traffic
doSearch := func() {
routerWork := func() {
// Check to see if there is a search already matching the destination
sinfo, isIn := c.core.searches.searches[*c.nodeID]
if !isIn {
// Nothing was found, so create a new search
searchCompleted := func(sinfo *sessionInfo, e error) {}
sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo)
}
// Continue the search
sinfo.continueSearch()
}
go func() { c.core.router.admin <- routerWork }()
}
switch {
case !sinfo.init:
sinfo.core.sessions.ping(sinfo)
case time.Since(sinfo.time) > 6*time.Second:
if sinfo.time.Before(sinfo.pingTime) && time.Since(sinfo.pingTime) > 6*time.Second {
// TODO double check that the above condition is correct
doSearch()
} else {
sinfo.core.sessions.ping(sinfo)
}
default: // Don't do anything, to keep traffic throttled
}
} }
// Set up a timer so this doesn't block forever // Set up a timer so this doesn't block forever
timer := getDeadlineTimer(&c.writeDeadline) timer := getDeadlineTimer(&c.writeDeadline)
@ -327,7 +251,6 @@ func (c *Conn) Close() error {
if c.session != nil { if c.session != nil {
// Close the session, if it hasn't been closed already // Close the session, if it hasn't been closed already
c.session.close() c.session.close()
c.session = nil
} }
// This can't fail yet - TODO? // This can't fail yet - TODO?
c.closed = true c.closed = true

View File

@ -68,9 +68,9 @@ type dht struct {
core *Core core *Core
reconfigure chan chan error reconfigure chan chan error
nodeID crypto.NodeID nodeID crypto.NodeID
peers chan *dhtInfo // other goroutines put incoming dht updates here peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests
callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks callbacks map[dhtReqKey][]dht_callbackInfo // Search and admin lookup callbacks
// These next two could be replaced by a single linked list or similar... // These next two could be replaced by a single linked list or similar...
table map[crypto.NodeID]*dhtInfo table map[crypto.NodeID]*dhtInfo
imp []*dhtInfo imp []*dhtInfo
@ -88,7 +88,7 @@ func (t *dht) init(c *Core) {
}() }()
t.nodeID = *t.core.NodeID() t.nodeID = *t.core.NodeID()
t.peers = make(chan *dhtInfo, 1024) t.peers = make(chan *dhtInfo, 1024)
t.callbacks = make(map[dhtReqKey]dht_callbackInfo) t.callbacks = make(map[dhtReqKey][]dht_callbackInfo)
t.reset() t.reset()
} }
@ -244,15 +244,17 @@ type dht_callbackInfo struct {
// Adds a callback and removes it after some timeout. // Adds a callback and removes it after some timeout.
func (t *dht) addCallback(rq *dhtReqKey, callback func(*dhtRes)) { func (t *dht) addCallback(rq *dhtReqKey, callback func(*dhtRes)) {
info := dht_callbackInfo{callback, time.Now().Add(6 * time.Second)} info := dht_callbackInfo{callback, time.Now().Add(6 * time.Second)}
t.callbacks[*rq] = info t.callbacks[*rq] = append(t.callbacks[*rq], info)
} }
// Reads a lookup response, checks that we had sent a matching request, and processes the response info. // Reads a lookup response, checks that we had sent a matching request, and processes the response info.
// This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses // This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses
func (t *dht) handleRes(res *dhtRes) { func (t *dht) handleRes(res *dhtRes) {
rq := dhtReqKey{res.Key, res.Dest} rq := dhtReqKey{res.Key, res.Dest}
if callback, isIn := t.callbacks[rq]; isIn { if callbacks, isIn := t.callbacks[rq]; isIn {
callback.f(res) for _, callback := range callbacks {
callback.f(res)
}
delete(t.callbacks, rq) delete(t.callbacks, rq)
} }
_, isIn := t.reqs[rq] _, isIn := t.reqs[rq]
@ -326,10 +328,15 @@ func (t *dht) doMaintenance() {
} }
} }
t.reqs = newReqs t.reqs = newReqs
newCallbacks := make(map[dhtReqKey]dht_callbackInfo, len(t.callbacks)) newCallbacks := make(map[dhtReqKey][]dht_callbackInfo, len(t.callbacks))
for key, callback := range t.callbacks { for key, cs := range t.callbacks {
if now.Before(callback.time) { for _, c := range cs {
newCallbacks[key] = callback if now.Before(c.time) {
newCallbacks[key] = append(newCallbacks[key], c)
} else {
// Signal failure
c.f(nil)
}
} }
} }
t.callbacks = newCallbacks t.callbacks = newCallbacks

View File

@ -14,6 +14,8 @@ type Dialer struct {
core *Core core *Core
} }
// TODO DialContext that allows timeouts/cancellation, Dial should just call this with no timeout set in the context
// Dial opens a session to the given node. The first paramter should be "nodeid" // Dial opens a session to the given node. The first paramter should be "nodeid"
// and the second parameter should contain a hexadecimal representation of the // and the second parameter should contain a hexadecimal representation of the
// target node ID. // target node ID.
@ -58,5 +60,8 @@ func (d *Dialer) Dial(network, address string) (*Conn, error) {
// NodeID parameters. // NodeID parameters.
func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (*Conn, error) { func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (*Conn, error) {
conn := newConn(d.core, nodeID, nodeMask, nil) conn := newConn(d.core, nodeID, nodeMask, nil)
if err := conn.search(); err != nil {
return nil, err
}
return conn, nil return conn, nil
} }

View File

@ -33,13 +33,14 @@ const search_RETRY_TIME = time.Second
// Information about an ongoing search. // Information about an ongoing search.
// Includes the target NodeID, the bitmask to match it to an IP, and the list of nodes to visit / already visited. // Includes the target NodeID, the bitmask to match it to an IP, and the list of nodes to visit / already visited.
type searchInfo struct { type searchInfo struct {
core *Core
dest crypto.NodeID dest crypto.NodeID
mask crypto.NodeID mask crypto.NodeID
time time.Time time time.Time
packet []byte
toVisit []*dhtInfo toVisit []*dhtInfo
visited map[crypto.NodeID]bool visited map[crypto.NodeID]bool
callback func(*sessionInfo, error) callback func(*sessionInfo, error)
// TODO context.Context for timeout and cancellation
} }
// This stores a map of active searches. // This stores a map of active searches.
@ -49,7 +50,7 @@ type searches struct {
searches map[crypto.NodeID]*searchInfo searches map[crypto.NodeID]*searchInfo
} }
// Intializes the searches struct. // Initializes the searches struct.
func (s *searches) init(core *Core) { func (s *searches) init(core *Core) {
s.core = core s.core = core
s.reconfigure = make(chan chan error, 1) s.reconfigure = make(chan chan error, 1)
@ -65,12 +66,13 @@ func (s *searches) init(core *Core) {
// Creates a new search info, adds it to the searches struct, and returns a pointer to the info. // Creates a new search info, adds it to the searches struct, and returns a pointer to the info.
func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo { func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo {
now := time.Now() now := time.Now()
for dest, sinfo := range s.searches { //for dest, sinfo := range s.searches {
if now.Sub(sinfo.time) > time.Minute { // if now.Sub(sinfo.time) > time.Minute {
delete(s.searches, dest) // delete(s.searches, dest)
} // }
} //}
info := searchInfo{ info := searchInfo{
core: s.core,
dest: *dest, dest: *dest,
mask: *mask, mask: *mask,
time: now.Add(-time.Second), time: now.Add(-time.Second),
@ -82,30 +84,29 @@ func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callba
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// Checks if there's an ongoing search relaed to a dhtRes. // Checks if there's an ongoing search related to a dhtRes.
// If there is, it adds the response info to the search and triggers a new search step. // If there is, it adds the response info to the search and triggers a new search step.
// If there's no ongoing search, or we if the dhtRes finished the search (it was from the target node), then don't do anything more. // If there's no ongoing search, or we if the dhtRes finished the search (it was from the target node), then don't do anything more.
func (s *searches) handleDHTRes(res *dhtRes) { func (sinfo *searchInfo) handleDHTRes(res *dhtRes) {
sinfo, isIn := s.searches[res.Dest] if res == nil || sinfo.checkDHTRes(res) {
if !isIn || s.checkDHTRes(sinfo, res) {
// Either we don't recognize this search, or we just finished it // Either we don't recognize this search, or we just finished it
return return
} }
// Add to the search and continue // Add to the search and continue
s.addToSearch(sinfo, res) sinfo.addToSearch(res)
s.doSearchStep(sinfo) sinfo.doSearchStep()
} }
// Adds the information from a dhtRes to an ongoing search. // Adds the information from a dhtRes to an ongoing search.
// Info about a node that has already been visited is not re-added to the search. // Info about a node that has already been visited is not re-added to the search.
// Duplicate information about nodes toVisit is deduplicated (the newest information is kept). // Duplicate information about nodes toVisit is deduplicated (the newest information is kept).
// The toVisit list is sorted in ascending order of keyspace distance from the destination. // The toVisit list is sorted in ascending order of keyspace distance from the destination.
func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { func (sinfo *searchInfo) addToSearch(res *dhtRes) {
// Add responses to toVisit if closer to dest than the res node // Add responses to toVisit if closer to dest than the res node
from := dhtInfo{key: res.Key, coords: res.Coords} from := dhtInfo{key: res.Key, coords: res.Coords}
sinfo.visited[*from.getNodeID()] = true sinfo.visited[*from.getNodeID()] = true
for _, info := range res.Infos { for _, info := range res.Infos {
if *info.getNodeID() == s.core.dht.nodeID || sinfo.visited[*info.getNodeID()] { if *info.getNodeID() == sinfo.core.dht.nodeID || sinfo.visited[*info.getNodeID()] {
continue continue
} }
if dht_ordered(&sinfo.dest, info.getNodeID(), from.getNodeID()) { if dht_ordered(&sinfo.dest, info.getNodeID(), from.getNodeID()) {
@ -135,42 +136,43 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) {
// If there are no nodes left toVisit, then this cleans up the search. // If there are no nodes left toVisit, then this cleans up the search.
// Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping. // Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping.
func (s *searches) doSearchStep(sinfo *searchInfo) { func (sinfo *searchInfo) doSearchStep() {
if len(sinfo.toVisit) == 0 { if len(sinfo.toVisit) == 0 {
// Dead end, do cleanup // Dead end, do cleanup
delete(s.searches, sinfo.dest) delete(sinfo.core.searches.searches, sinfo.dest)
go sinfo.callback(nil, errors.New("search reached dead end")) sinfo.callback(nil, errors.New("search reached dead end"))
return return
} }
// Send to the next search target // Send to the next search target
var next *dhtInfo var next *dhtInfo
next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:]
rq := dhtReqKey{next.key, sinfo.dest} rq := dhtReqKey{next.key, sinfo.dest}
s.core.dht.addCallback(&rq, s.handleDHTRes) sinfo.core.dht.addCallback(&rq, sinfo.handleDHTRes)
s.core.dht.ping(next, &sinfo.dest) sinfo.core.dht.ping(next, &sinfo.dest)
} }
// If we've recenty sent a ping for this search, do nothing. // If we've recenty sent a ping for this search, do nothing.
// Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME. // Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME.
func (s *searches) continueSearch(sinfo *searchInfo) { func (sinfo *searchInfo) continueSearch() {
if time.Since(sinfo.time) < search_RETRY_TIME { if time.Since(sinfo.time) < search_RETRY_TIME {
return return
} }
sinfo.time = time.Now() sinfo.time = time.Now()
s.doSearchStep(sinfo) sinfo.doSearchStep()
// In case the search dies, try to spawn another thread later // In case the search dies, try to spawn another thread later
// Note that this will spawn multiple parallel searches as time passes // Note that this will spawn multiple parallel searches as time passes
// Any that die aren't restarted, but a new one will start later // Any that die aren't restarted, but a new one will start later
retryLater := func() { retryLater := func() {
newSearchInfo := s.searches[sinfo.dest] // FIXME this keeps the search alive forever if not for the searches map, fix that
newSearchInfo := sinfo.core.searches.searches[sinfo.dest]
if newSearchInfo != sinfo { if newSearchInfo != sinfo {
return return
} }
s.continueSearch(sinfo) sinfo.continueSearch()
} }
go func() { go func() {
time.Sleep(search_RETRY_TIME) time.Sleep(search_RETRY_TIME)
s.core.router.admin <- retryLater sinfo.core.router.admin <- retryLater
}() }()
} }
@ -185,37 +187,36 @@ func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callb
// Checks if a dhtRes is good (called by handleDHTRes). // Checks if a dhtRes is good (called by handleDHTRes).
// If the response is from the target, get/create a session, trigger a session ping, and return true. // If the response is from the target, get/create a session, trigger a session ping, and return true.
// Otherwise return false. // Otherwise return false.
func (s *searches) checkDHTRes(info *searchInfo, res *dhtRes) bool { func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool {
them := crypto.GetNodeID(&res.Key) them := crypto.GetNodeID(&res.Key)
var destMasked crypto.NodeID var destMasked crypto.NodeID
var themMasked crypto.NodeID var themMasked crypto.NodeID
for idx := 0; idx < crypto.NodeIDLen; idx++ { for idx := 0; idx < crypto.NodeIDLen; idx++ {
destMasked[idx] = info.dest[idx] & info.mask[idx] destMasked[idx] = sinfo.dest[idx] & sinfo.mask[idx]
themMasked[idx] = them[idx] & info.mask[idx] themMasked[idx] = them[idx] & sinfo.mask[idx]
} }
if themMasked != destMasked { if themMasked != destMasked {
return false return false
} }
// They match, so create a session and send a sessionRequest // They match, so create a session and send a sessionRequest
sinfo, isIn := s.core.sessions.getByTheirPerm(&res.Key) sess, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key)
if !isIn { if !isIn {
sinfo = s.core.sessions.createSession(&res.Key) sess = sinfo.core.sessions.createSession(&res.Key)
if sinfo == nil { if sess == nil {
// nil if the DHT search finished but the session wasn't allowed // nil if the DHT search finished but the session wasn't allowed
go info.callback(nil, errors.New("session not allowed")) sinfo.callback(nil, errors.New("session not allowed"))
return true return true
} }
_, isIn := s.core.sessions.getByTheirPerm(&res.Key) _, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key)
if !isIn { if !isIn {
panic("This should never happen") panic("This should never happen")
} }
} }
// FIXME (!) replay attacks could mess with coords? Give it a handle (tstamp)? // FIXME (!) replay attacks could mess with coords? Give it a handle (tstamp)?
sinfo.coords = res.Coords sess.coords = res.Coords
sinfo.packet = info.packet sinfo.core.sessions.ping(sess)
s.core.sessions.ping(sinfo) sinfo.callback(sess, nil)
go info.callback(sinfo, nil)
// Cleanup // Cleanup
delete(s.searches, res.Dest) delete(sinfo.core.searches.searches, res.Dest)
return true return true
} }

View File

@ -39,7 +39,6 @@ type sessionInfo struct {
pingTime time.Time // time the first ping was sent since the last received packet pingTime time.Time // time the first ping was sent since the last received packet
pingSend time.Time // time the last ping was sent pingSend time.Time // time the last ping was sent
coords []byte // coords of destination coords []byte // coords of destination
packet []byte // a buffered packet, sent immediately on ping/pong
init bool // Reset if coords change init bool // Reset if coords change
tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation
bytesSent uint64 // Bytes of real traffic sent in this session bytesSent uint64 // Bytes of real traffic sent in this session
@ -118,12 +117,8 @@ type sessions struct {
isAllowedHandler func(pubkey *crypto.BoxPubKey, initiator bool) bool // Returns true or false if session setup is allowed isAllowedHandler func(pubkey *crypto.BoxPubKey, initiator bool) bool // Returns true or false if session setup is allowed
isAllowedMutex sync.RWMutex // Protects the above isAllowedMutex sync.RWMutex // Protects the above
permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey // Maps known permanent keys to their shared key, used by DHT a lot permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey // Maps known permanent keys to their shared key, used by DHT a lot
sinfos map[crypto.Handle]*sessionInfo // Maps (secret) handle onto session info sinfos map[crypto.Handle]*sessionInfo // Maps handle onto session info
conns map[crypto.Handle]*Conn // Maps (secret) handle onto connections
byMySes map[crypto.BoxPubKey]*crypto.Handle // Maps mySesPub onto handle
byTheirPerm map[crypto.BoxPubKey]*crypto.Handle // Maps theirPermPub onto handle byTheirPerm map[crypto.BoxPubKey]*crypto.Handle // Maps theirPermPub onto handle
addrToPerm map[address.Address]*crypto.BoxPubKey
subnetToPerm map[address.Subnet]*crypto.BoxPubKey
} }
// Initializes the session struct. // Initializes the session struct.
@ -149,10 +144,7 @@ func (ss *sessions) init(core *Core) {
}() }()
ss.permShared = make(map[crypto.BoxPubKey]*crypto.BoxSharedKey) ss.permShared = make(map[crypto.BoxPubKey]*crypto.BoxSharedKey)
ss.sinfos = make(map[crypto.Handle]*sessionInfo) ss.sinfos = make(map[crypto.Handle]*sessionInfo)
ss.byMySes = make(map[crypto.BoxPubKey]*crypto.Handle)
ss.byTheirPerm = make(map[crypto.BoxPubKey]*crypto.Handle) ss.byTheirPerm = make(map[crypto.BoxPubKey]*crypto.Handle)
ss.addrToPerm = make(map[address.Address]*crypto.BoxPubKey)
ss.subnetToPerm = make(map[address.Subnet]*crypto.BoxPubKey)
ss.lastCleanup = time.Now() ss.lastCleanup = time.Now()
} }
@ -175,16 +167,6 @@ func (ss *sessions) getSessionForHandle(handle *crypto.Handle) (*sessionInfo, bo
return sinfo, isIn return sinfo, isIn
} }
// Gets a session corresponding to an ephemeral session key used by this node.
func (ss *sessions) getByMySes(key *crypto.BoxPubKey) (*sessionInfo, bool) {
h, isIn := ss.byMySes[*key]
if !isIn {
return nil, false
}
sinfo, isIn := ss.getSessionForHandle(h)
return sinfo, isIn
}
// Gets a session corresponding to a permanent key used by the remote node. // Gets a session corresponding to a permanent key used by the remote node.
func (ss *sessions) getByTheirPerm(key *crypto.BoxPubKey) (*sessionInfo, bool) { func (ss *sessions) getByTheirPerm(key *crypto.BoxPubKey) (*sessionInfo, bool) {
h, isIn := ss.byTheirPerm[*key] h, isIn := ss.byTheirPerm[*key]
@ -195,26 +177,6 @@ func (ss *sessions) getByTheirPerm(key *crypto.BoxPubKey) (*sessionInfo, bool) {
return sinfo, isIn return sinfo, isIn
} }
// Gets a session corresponding to an IPv6 address used by the remote node.
func (ss *sessions) getByTheirAddr(addr *address.Address) (*sessionInfo, bool) {
p, isIn := ss.addrToPerm[*addr]
if !isIn {
return nil, false
}
sinfo, isIn := ss.getByTheirPerm(p)
return sinfo, isIn
}
// Gets a session corresponding to an IPv6 /64 subnet used by the remote node/network.
func (ss *sessions) getByTheirSubnet(snet *address.Subnet) (*sessionInfo, bool) {
p, isIn := ss.subnetToPerm[*snet]
if !isIn {
return nil, false
}
sinfo, isIn := ss.getByTheirPerm(p)
return sinfo, isIn
}
// Creates a new session and lazily cleans up old existing sessions. This // Creates a new session and lazily cleans up old existing sessions. This
// includse initializing session info to sane defaults (e.g. lowest supported // includse initializing session info to sane defaults (e.g. lowest supported
// MTU). // MTU).
@ -263,10 +225,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.worker = make(chan func(), 1) sinfo.worker = make(chan func(), 1)
sinfo.recv = make(chan *wire_trafficPacket, 32) sinfo.recv = make(chan *wire_trafficPacket, 32)
ss.sinfos[sinfo.myHandle] = &sinfo ss.sinfos[sinfo.myHandle] = &sinfo
ss.byMySes[sinfo.mySesPub] = &sinfo.myHandle
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
ss.addrToPerm[sinfo.theirAddr] = &sinfo.theirPermPub
ss.subnetToPerm[sinfo.theirSubnet] = &sinfo.theirPermPub
go sinfo.workerMain() go sinfo.workerMain()
return &sinfo return &sinfo
} }
@ -291,36 +250,18 @@ func (ss *sessions) cleanup() {
sinfos[k] = v sinfos[k] = v
} }
ss.sinfos = sinfos ss.sinfos = sinfos
byMySes := make(map[crypto.BoxPubKey]*crypto.Handle, len(ss.byMySes))
for k, v := range ss.byMySes {
byMySes[k] = v
}
ss.byMySes = byMySes
byTheirPerm := make(map[crypto.BoxPubKey]*crypto.Handle, len(ss.byTheirPerm)) byTheirPerm := make(map[crypto.BoxPubKey]*crypto.Handle, len(ss.byTheirPerm))
for k, v := range ss.byTheirPerm { for k, v := range ss.byTheirPerm {
byTheirPerm[k] = v byTheirPerm[k] = v
} }
ss.byTheirPerm = byTheirPerm ss.byTheirPerm = byTheirPerm
addrToPerm := make(map[address.Address]*crypto.BoxPubKey, len(ss.addrToPerm))
for k, v := range ss.addrToPerm {
addrToPerm[k] = v
}
ss.addrToPerm = addrToPerm
subnetToPerm := make(map[address.Subnet]*crypto.BoxPubKey, len(ss.subnetToPerm))
for k, v := range ss.subnetToPerm {
subnetToPerm[k] = v
}
ss.subnetToPerm = subnetToPerm
ss.lastCleanup = time.Now() ss.lastCleanup = time.Now()
} }
// Closes a session, removing it from sessions maps and killing the worker goroutine. // Closes a session, removing it from sessions maps and killing the worker goroutine.
func (sinfo *sessionInfo) close() { func (sinfo *sessionInfo) close() {
delete(sinfo.core.sessions.sinfos, sinfo.myHandle) delete(sinfo.core.sessions.sinfos, sinfo.myHandle)
delete(sinfo.core.sessions.byMySes, sinfo.mySesPub)
delete(sinfo.core.sessions.byTheirPerm, sinfo.theirPermPub) delete(sinfo.core.sessions.byTheirPerm, sinfo.theirPermPub)
delete(sinfo.core.sessions.addrToPerm, sinfo.theirAddr)
delete(sinfo.core.sessions.subnetToPerm, sinfo.theirSubnet)
close(sinfo.worker) close(sinfo.worker)
} }
@ -383,8 +324,8 @@ func (ss *sessions) sendPingPong(sinfo *sessionInfo, isPong bool) {
} }
packet := p.encode() packet := p.encode()
ss.core.router.out(packet) ss.core.router.out(packet)
if !isPong { if sinfo.pingTime.Before(sinfo.time) {
sinfo.pingSend = time.Now() sinfo.pingTime = time.Now()
} }
} }
@ -425,15 +366,6 @@ func (ss *sessions) handlePing(ping *sessionPing) {
if !ping.IsPong { if !ping.IsPong {
ss.sendPingPong(sinfo, true) ss.sendPingPong(sinfo, true)
} }
if sinfo.packet != nil {
/* FIXME this needs to live in the net.Conn or something, needs work in Write
// send
var bs []byte
bs, sinfo.packet = sinfo.packet, nil
ss.core.router.sendPacket(bs) // FIXME this needs to live in the net.Conn or something, needs work in Write
*/
sinfo.packet = nil
}
}) })
} }