From b20c8b6da5029d49d8beb74f204a90eb2e91e8b5 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 19 Apr 2019 00:11:43 +0100 Subject: [PATCH] Move some things around a bit, delete session workers --- src/yggdrasil/{api.go => conn.go} | 29 ------- src/yggdrasil/core.go | 32 ++++++++ src/yggdrasil/session.go | 130 ------------------------------ 3 files changed, 32 insertions(+), 159 deletions(-) rename src/yggdrasil/{api.go => conn.go} (85%) diff --git a/src/yggdrasil/api.go b/src/yggdrasil/conn.go similarity index 85% rename from src/yggdrasil/api.go rename to src/yggdrasil/conn.go index 41ef8c0..0ce626c 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/conn.go @@ -9,35 +9,6 @@ import ( "github.com/yggdrasil-network/yggdrasil-go/src/util" ) -func (c *Core) Dial(network, address string) (Conn, error) { - conn := Conn{} - nodeID := crypto.NodeID{} - nodeMask := crypto.NodeID{} - // Process - switch network { - case "nodeid": - // A node ID was provided - we don't need to do anything special with it - dest, err := hex.DecodeString(address) - if err != nil { - return Conn{}, err - } - copy(nodeID[:], dest) - for i := range nodeMask { - nodeMask[i] = 0xFF - } - default: - // An unexpected address type was given, so give up - return Conn{}, errors.New("unexpected address type") - } - conn.core = c - conn.nodeID = &nodeID - conn.nodeMask = &nodeMask - conn.core.router.doAdmin(func() { - conn.startSearch() - }) - return conn, nil -} - type Conn struct { core *Core nodeID *crypto.NodeID diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 037ef09..22caf08 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -254,6 +254,38 @@ func (c *Core) Stop() { c.admin.close() } +// Dial opens a session to the given node. The first paramter should be "nodeid" +// and the second parameter should contain a hexadecimal representation of the +// target node ID. +func (c *Core) Dial(network, address string) (Conn, error) { + conn := Conn{} + nodeID := crypto.NodeID{} + nodeMask := crypto.NodeID{} + // Process + switch network { + case "nodeid": + // A node ID was provided - we don't need to do anything special with it + dest, err := hex.DecodeString(address) + if err != nil { + return Conn{}, err + } + copy(nodeID[:], dest) + for i := range nodeMask { + nodeMask[i] = 0xFF + } + default: + // An unexpected address type was given, so give up + return Conn{}, errors.New("unexpected address type") + } + conn.core = c + conn.nodeID = &nodeID + conn.nodeMask = &nodeMask + conn.core.router.doAdmin(func() { + conn.startSearch() + }) + return conn, nil +} + // ListenTCP starts a new TCP listener. The input URI should match that of the // "Listen" configuration item, e.g. // tcp://a.b.c.d:e diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 4c896f2..a183545 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -12,7 +12,6 @@ import ( "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - "github.com/yggdrasil-network/yggdrasil-go/src/util" ) // All the information we know about an active session. @@ -307,7 +306,6 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.send = make(chan []byte, 32) sinfo.recv = make(chan *wire_trafficPacket, 32) - go sinfo.doWorker() ss.sinfos[sinfo.myHandle] = &sinfo ss.byMySes[sinfo.mySesPub] = &sinfo.myHandle ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle @@ -521,131 +519,3 @@ func (ss *sessions) resetInits() { sinfo.init = false } } - -//////////////////////////////////////////////////////////////////////////////// -/* -// This is for a per-session worker. -// It handles calling the relatively expensive crypto operations. -// It's also responsible for checking nonces and dropping out-of-date/duplicate packets, or else calling the function to update nonces if the packet is OK. -func (sinfo *sessionInfo) doWorker() { - send := make(chan []byte, 32) - defer close(send) - go func() { - for bs := range send { - sinfo.doSend(bs) - } - }() - recv := make(chan *wire_trafficPacket, 32) - defer close(recv) - go func() { - for p := range recv { - sinfo.doRecv(p) - } - }() - for { - select { - case p, ok := <-sinfo.recv: - if ok { - select { - case recv <- p: - default: - // We need something to not block, and it's best to drop it before we decrypt - util.PutBytes(p.Payload) - } - } else { - return - } - case bs, ok := <-sinfo.send: - if ok { - send <- bs - } else { - return - } - case e := <-sinfo.reconfigure: - e <- nil - } - } -} - -// This encrypts a packet, creates a trafficPacket struct, encodes it, and sends it to router.out to pass it to the switch layer. -func (sinfo *sessionInfo) doSend(bs []byte) { - defer util.PutBytes(bs) - if !sinfo.init { - // To prevent using empty session keys - return - } - // code isn't multithreaded so appending to this is safe - coords := sinfo.coords - // Work out the flowkey - this is used to determine which switch queue - // traffic will be pushed to in the event of congestion - var flowkey uint64 - // Get the IP protocol version from the packet - switch bs[0] & 0xf0 { - case 0x40: // IPv4 packet - // Check the packet meets minimum UDP packet length - if len(bs) >= 24 { - // Is the protocol TCP, UDP or SCTP? - if bs[9] == 0x06 || bs[9] == 0x11 || bs[9] == 0x84 { - ihl := bs[0] & 0x0f * 4 // Header length - flowkey = uint64(bs[9])<<32 /* proto */ | - uint64(bs[ihl+0])<<24 | uint64(bs[ihl+1])<<16 /* sport */ | - uint64(bs[ihl+2])<<8 | uint64(bs[ihl+3]) /* dport */ - } - } - case 0x60: // IPv6 packet - // Check if the flowlabel was specified in the packet header - flowkey = uint64(bs[1]&0x0f)<<16 | uint64(bs[2])<<8 | uint64(bs[3]) - // If the flowlabel isn't present, make protokey from proto | sport | dport - // if the packet meets minimum UDP packet length - if flowkey == 0 && len(bs) >= 48 { - // Is the protocol TCP, UDP or SCTP? - if bs[6] == 0x06 || bs[6] == 0x11 || bs[6] == 0x84 { - flowkey = uint64(bs[6])<<32 /* proto */ | - uint64(bs[40])<<24 | uint64(bs[41])<<16 /* sport */ | - uint64(bs[42])<<8 | uint64(bs[43]) /* dport */ - } - } - } - // If we have a flowkey, either through the IPv6 flowlabel field or through - // known TCP/UDP/SCTP proto-sport-dport triplet, then append it to the coords. - // Appending extra coords after a 0 ensures that we still target the local router - // but lets us send extra data (which is otherwise ignored) to help separate - // traffic streams into independent queues - if flowkey != 0 { - coords = append(coords, 0) // First target the local switchport - coords = wire_put_uint64(flowkey, coords) // Then variable-length encoded flowkey - } - // Prepare the payload - payload, nonce := crypto.BoxSeal(&sinfo.sharedSesKey, bs, &sinfo.myNonce) - defer util.PutBytes(payload) - p := wire_trafficPacket{ - Coords: coords, - Handle: sinfo.theirHandle, - Nonce: *nonce, - Payload: payload, - } - packet := p.encode() - sinfo.bytesSent += uint64(len(bs)) - sinfo.core.router.out(packet) -} - -// This takes a trafficPacket and checks the nonce. -// If the nonce is OK, it decrypts the packet. -// If the decrypted packet is OK, it calls router.recvPacket to pass the packet to the tun/tap. -// If a packet does not decrypt successfully, it assumes the packet was truncated, and updates the MTU accordingly. -// TODO? remove the MTU updating part? That should never happen with TCP peers, and the old UDP code that caused it was removed (and if replaced, should be replaced with something that can reliably send messages with an arbitrary size). -func (sinfo *sessionInfo) doRecv(p *wire_trafficPacket) { - defer util.PutBytes(p.Payload) - if !sinfo.nonceIsOK(&p.Nonce) { - return - } - bs, isOK := crypto.BoxOpen(&sinfo.sharedSesKey, p.Payload, &p.Nonce) - if !isOK { - util.PutBytes(bs) - return - } - sinfo.updateNonce(&p.Nonce) - sinfo.time = time.Now() - sinfo.bytesRecvd += uint64(len(bs)) - sinfo.core.router.toRecv <- router_recvPacket{bs, sinfo} -}