From d5031a5cb610cd755039fbf68d013418f77fcb74 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sat, 15 Dec 2018 00:48:27 +0000 Subject: [PATCH] Metadata exchange without sessions --- src/yggdrasil/admin.go | 52 +++++++++++++++ src/yggdrasil/core.go | 13 ++-- src/yggdrasil/metadata.go | 92 ++++++++++++++++++++++++++ src/yggdrasil/router.go | 2 +- src/yggdrasil/session.go | 136 +++++++++----------------------------- src/yggdrasil/wire.go | 5 +- 6 files changed, 187 insertions(+), 113 deletions(-) create mode 100644 src/yggdrasil/metadata.go diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 266d5a8..1bf6af0 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -322,6 +322,14 @@ func (a *admin) init(c *Core, listenaddr string) { return admin_info{}, err } }) + a.addHandler("getMeta", []string{"box_pub_key", "coords"}, func(in admin_info) (admin_info, error) { + result, err := a.admin_getMeta(in["box_pub_key"].(string), in["coords"].(string)) + if err == nil { + return admin_info{"metadata": string(result)}, nil + } else { + return admin_info{}, err + } + }) } // start runs the admin API socket to listen for / respond to admin API calls. @@ -806,6 +814,50 @@ func (a *admin) admin_dhtPing(keyString, coordString, targetString string) (dhtR return dhtRes{}, errors.New(fmt.Sprintf("DHT ping timeout: %s", keyString)) } +func (a *admin) admin_getMeta(keyString, coordString string) (metadataPayload, error) { + var key boxPubKey + if keyBytes, err := hex.DecodeString(keyString); err != nil { + return metadataPayload{}, err + } else { + copy(key[:], keyBytes) + } + var coords []byte + for _, cstr := range strings.Split(strings.Trim(coordString, "[]"), " ") { + if cstr == "" { + // Special case, happens if trimmed is the empty string, e.g. this is the root + continue + } + if u64, err := strconv.ParseUint(cstr, 10, 8); err != nil { + return metadataPayload{}, err + } else { + coords = append(coords, uint8(u64)) + } + } + response := make(chan *metadataPayload, 1) + sendMetaRequest := func() { + a.core.metadata.callbacks[key] = metadataCallback{ + created: time.Now(), + call: func(meta *metadataPayload) { + defer func() { recover() }() + select { + case response <- meta: + default: + } + }, + } + a.core.metadata.sendMetadata(key, coords, false) + } + a.core.router.doAdmin(sendMetaRequest) + go func() { + time.Sleep(6 * time.Second) + close(response) + }() + for res := range response { + return *res, nil + } + return metadataPayload{}, errors.New(fmt.Sprintf("getMeta timeout: %s", keyString)) +} + // getResponse_dot returns a response for a graphviz dot formatted representation of the known parts of the network. // This is color-coded and labeled, and includes the self node, switch peers, nodes known to the DHT, and nodes with open sessions. // The graph is structured as a tree with directed links leading away from the root. diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 72a11d7..7a3564d 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -32,6 +32,7 @@ type Core struct { admin admin searches searches multicast multicast + metadata metadata tcp tcpInterface log *log.Logger ifceExpr []*regexp.Regexp // the zone of link-local IPv6 peers must match this @@ -124,6 +125,9 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { c.init(&boxPub, &boxPriv, &sigPub, &sigPriv) c.admin.init(c, nc.AdminListen) + c.metadata.init(c) + c.metadata.setMetadata(metadataPayload("HIYA, THIS IS METADATA")) + if err := c.tcp.init(c, nc.Listen, nc.ReadTimeout); err != nil { c.log.Println("Failed to start TCP interface") return err @@ -138,7 +142,6 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { return err } - c.sessions.setMetadata(metadata("HIYA, THIS IS METADATA")) c.sessions.setSessionFirewallState(nc.SessionFirewall.Enable) c.sessions.setSessionFirewallDefaults( nc.SessionFirewall.AllowFromDirect, @@ -241,13 +244,13 @@ func (c *Core) GetSubnet() *net.IPNet { } // Gets the node metadata. -func (c *Core) GetMetadata() metadata { - return c.sessions.getMetadata() +func (c *Core) GetMetadata() metadataPayload { + return c.metadata.getMetadata() } // Sets the node metadata. -func (c *Core) SetMetadata(meta metadata) { - c.sessions.setMetadata(meta) +func (c *Core) SetMetadata(meta metadataPayload) { + c.metadata.setMetadata(meta) } // Sets the output logger of the Yggdrasil node after startup. This may be diff --git a/src/yggdrasil/metadata.go b/src/yggdrasil/metadata.go new file mode 100644 index 0000000..7f607ed --- /dev/null +++ b/src/yggdrasil/metadata.go @@ -0,0 +1,92 @@ +package yggdrasil + +import ( + "sync" + "time" +) + +type metadata struct { + core *Core + myMetadata metadataPayload + myMetadataMutex sync.RWMutex + callbacks map[boxPubKey]metadataCallback + cache map[boxPubKey]metadataPayload +} + +type metadataPayload []byte + +type metadataCallback struct { + call func(meta *metadataPayload) + created time.Time +} + +// Initialises the metadata cache/callback stuff +func (m *metadata) init(core *Core) { + m.core = core + m.callbacks = make(map[boxPubKey]metadataCallback) + m.cache = make(map[boxPubKey]metadataPayload) + + go func() { + for { + for boxPubKey, callback := range m.callbacks { + if time.Since(callback.created) > time.Minute { + delete(m.callbacks, boxPubKey) + } + } + time.Sleep(time.Second * 5) + } + }() +} + +// Handles the callback, if there is one +func (m *metadata) callback(sender boxPubKey, meta metadataPayload) { + if callback, ok := m.callbacks[sender]; ok { + callback.call(&meta) + delete(m.callbacks, sender) + } +} + +// Get the metadata +func (m *metadata) getMetadata() metadataPayload { + m.myMetadataMutex.RLock() + defer m.myMetadataMutex.RUnlock() + return m.myMetadata +} + +// Set the metadata +func (m *metadata) setMetadata(meta metadataPayload) { + m.myMetadataMutex.Lock() + defer m.myMetadataMutex.Unlock() + m.myMetadata = meta +} + +// Handles a meta request/response. +func (m *metadata) handleMetadata(meta *sessionMeta) { + if meta.IsResponse { + m.core.metadata.callback(meta.SendPermPub, meta.Metadata) + } else { + m.sendMetadata(meta.SendPermPub, meta.SendCoords, true) + } +} + +// Send metadata request or response +func (m *metadata) sendMetadata(key boxPubKey, coords []byte, isResponse bool) { + table := m.core.switchTable.table.Load().(lookupTable) + meta := sessionMeta{ + SendCoords: table.self.getCoords(), + IsResponse: isResponse, + Metadata: m.core.metadata.getMetadata(), + } + bs := meta.encode() + shared := m.core.sessions.getSharedKey(&m.core.boxPriv, &key) + payload, nonce := boxSeal(shared, bs, nil) + p := wire_protoTrafficPacket{ + Coords: coords, + ToKey: key, + FromKey: m.core.boxPub, + Nonce: *nonce, + Payload: payload, + } + packet := p.encode() + m.core.router.out(packet) +} diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 4914a6b..e6f6b0f 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -483,7 +483,7 @@ func (r *router) handleMetadata(bs []byte, fromKey *boxPubKey) { return } req.SendPermPub = *fromKey - r.core.sessions.handleMetadata(&req) + r.core.metadata.handleMetadata(&req) } // Passed a function to call. diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 31fe375..418c46a 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -7,44 +7,40 @@ package yggdrasil import ( "bytes" "encoding/hex" - "sync" "time" ) // All the information we know about an active session. // This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. type sessionInfo struct { - core *Core - theirAddr address - theirSubnet subnet - theirPermPub boxPubKey - theirSesPub boxPubKey - mySesPub boxPubKey - mySesPriv boxPrivKey - sharedSesKey boxSharedKey // derived from session keys - theirHandle handle - myHandle handle - theirNonce boxNonce - myNonce boxNonce - metaReqTime time.Time - metaResTime time.Time - theirMetadata metadata - theirMTU uint16 - myMTU uint16 - wasMTUFixed bool // Was the MTU fixed by a receive error? - time time.Time // Time we last received a packet - coords []byte // coords of destination - packet []byte // a buffered packet, sent immediately on ping/pong - init bool // Reset if coords change - send chan []byte - recv chan *wire_trafficPacket - nonceMask uint64 - tstamp int64 // tstamp from their last session ping, replay attack mitigation - mtuTime time.Time // time myMTU was last changed - pingTime time.Time // time the first ping was sent since the last received packet - pingSend time.Time // time the last ping was sent - bytesSent uint64 // Bytes of real traffic sent in this session - bytesRecvd uint64 // Bytes of real traffic received in this session + core *Core + theirAddr address + theirSubnet subnet + theirPermPub boxPubKey + theirSesPub boxPubKey + mySesPub boxPubKey + mySesPriv boxPrivKey + sharedSesKey boxSharedKey // derived from session keys + theirHandle handle + myHandle handle + theirNonce boxNonce + myNonce boxNonce + theirMTU uint16 + myMTU uint16 + wasMTUFixed bool // Was the MTU fixed by a receive error? + time time.Time // Time we last received a packet + coords []byte // coords of destination + packet []byte // a buffered packet, sent immediately on ping/pong + init bool // Reset if coords change + send chan []byte + recv chan *wire_trafficPacket + nonceMask uint64 + tstamp int64 // tstamp from their last session ping, replay attack mitigation + mtuTime time.Time // time myMTU was last changed + pingTime time.Time // time the first ping was sent since the last received packet + pingSend time.Time // time the last ping was sent + bytesSent uint64 // Bytes of real traffic sent in this session + bytesRecvd uint64 // Bytes of real traffic received in this session } // Represents a session ping/pong packet, andincludes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. @@ -61,12 +57,11 @@ type sessionPing struct { // Represents a session metadata packet. type sessionMeta struct { SendPermPub boxPubKey // Sender's permanent key + SendCoords []byte // Sender's coords IsResponse bool - Metadata metadata + Metadata metadataPayload } -type metadata []byte - // Updates session info in response to a ping, after checking that the ping is OK. // Returns true if the session was updated, or false otherwise. func (s *sessionInfo) update(p *sessionPing) bool { @@ -128,9 +123,6 @@ type sessions struct { sessionFirewallAlwaysAllowsOutbound bool sessionFirewallWhitelist []string sessionFirewallBlacklist []string - // Metadata for this node - myMetadata metadata - myMetadataMutex sync.RWMutex } // Initializes the session struct. @@ -145,20 +137,6 @@ func (ss *sessions) init(core *Core) { ss.lastCleanup = time.Now() } -// Get the metadata -func (ss *sessions) getMetadata() metadata { - ss.myMetadataMutex.RLock() - defer ss.myMetadataMutex.RUnlock() - return ss.myMetadata -} - -// Set the metadata -func (ss *sessions) setMetadata(meta metadata) { - ss.myMetadataMutex.Lock() - defer ss.myMetadataMutex.Unlock() - ss.myMetadata = meta -} - // Enable or disable the session firewall func (ss *sessions) setSessionFirewallState(enabled bool) { ss.sessionFirewallEnabled = enabled @@ -496,60 +474,6 @@ func (ss *sessions) handlePing(ping *sessionPing) { bs, sinfo.packet = sinfo.packet, nil ss.core.router.sendPacket(bs) } - // This requests metadata from the remote side fairly quickly after - // establishing the session, and if other time constraints apply (no more - // often than 15 minutes since receiving the last metadata) - //if time.Since(sinfo.metaResTime).Minutes() > 15 { - // if time.Since(sinfo.metaReqTime).Minutes() > 1 { - // ss.sendMetadata(sinfo, false) - // } - //} -} - -func (ss *sessions) sendMetadata(sinfo *sessionInfo, isResponse bool) { - ss.myMetadataMutex.RLock() - meta := sessionMeta{ - IsResponse: isResponse, - Metadata: ss.myMetadata, - } - ss.myMetadataMutex.RUnlock() - bs := meta.encode() - shared := ss.getSharedKey(&ss.core.boxPriv, &sinfo.theirPermPub) - payload, nonce := boxSeal(shared, bs, nil) - p := wire_protoTrafficPacket{ - Coords: sinfo.coords, - ToKey: sinfo.theirPermPub, - FromKey: ss.core.boxPub, - Nonce: *nonce, - Payload: payload, - } - packet := p.encode() - ss.core.router.out(packet) - if !isResponse { - sinfo.metaReqTime = time.Now() - } -} - -// Handles a meta request/response. -func (ss *sessions) handleMetadata(meta *sessionMeta) { - // Get the corresponding session (or create a new session) - sinfo, isIn := ss.getByTheirPerm(&meta.SendPermPub) - // Check the session firewall - if !isIn && ss.sessionFirewallEnabled { - if !ss.isSessionAllowed(&meta.SendPermPub, false) { - return - } - } - if !isIn || sinfo.timedout() { - return - } - if meta.IsResponse { - sinfo.theirMetadata = meta.Metadata - sinfo.metaResTime = time.Now() - - } else { - ss.sendMetadata(sinfo, true) - } } // Used to subtract one nonce from another, staying in the range +- 64. diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index 2d87a37..d46994d 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -364,6 +364,7 @@ func (p *sessionMeta) encode() []byte { pTypeVal = wire_SessionMetaRequest } bs := wire_encode_uint64(pTypeVal) + bs = wire_put_coords(p.SendCoords, bs) if pTypeVal == wire_SessionMetaResponse { bs = append(bs, p.Metadata...) } @@ -378,12 +379,14 @@ func (p *sessionMeta) decode(bs []byte) bool { return false case pType != wire_SessionMetaRequest && pType != wire_SessionMetaResponse: return false + case !wire_chop_coords(&p.SendCoords, &bs): + return false } if p.IsResponse = pType == wire_SessionMetaResponse; p.IsResponse { if len(bs) == 0 { return false } - p.Metadata = make(metadata, len(bs)) + p.Metadata = make(metadataPayload, len(bs)) if !wire_chop_slice(p.Metadata[:], &bs) { return false }