diff --git a/cmd/yggdrasilsim/dial.go b/cmd/yggdrasilsim/dial.go new file mode 100644 index 0000000..c7892d4 --- /dev/null +++ b/cmd/yggdrasilsim/dial.go @@ -0,0 +1,61 @@ +package main + +import ( + "fmt" + "sort" + "time" + + "github.com/yggdrasil-network/yggdrasil-go/src/crypto" +) + +func doListen(recvNode *simNode) { + // TODO be able to stop the listeners somehow so they don't leak across different tests + for { + c, err := recvNode.listener.Accept() + if err != nil { + panic(err) + } + c.Close() + } +} + +func dialTest(sendNode, recvNode *simNode) { + if sendNode.id == recvNode.id { + fmt.Println("Skipping dial to self") + return + } + var mask crypto.NodeID + for idx := range mask { + mask[idx] = 0xff + } + for { + c, err := sendNode.dialer.DialByNodeIDandMask(nil, &recvNode.nodeID, &mask) + if c != nil { + c.Close() + return + } + if err != nil { + fmt.Println("Dial failed:", err) + } + time.Sleep(time.Second) + } +} + +func dialStore(store nodeStore) { + var nodeIdxs []int + for idx, n := range store { + nodeIdxs = append(nodeIdxs, idx) + go doListen(n) + } + sort.Slice(nodeIdxs, func(i, j int) bool { + return nodeIdxs[i] < nodeIdxs[j] + }) + for _, idx := range nodeIdxs { + sendNode := store[idx] + for _, jdx := range nodeIdxs { + recvNode := store[jdx] + fmt.Printf("Dialing from node %d to node %d / %d...\n", idx, jdx, len(store)) + dialTest(sendNode, recvNode) + } + } +} diff --git a/cmd/yggdrasilsim/main.go b/cmd/yggdrasilsim/main.go new file mode 100644 index 0000000..25504c9 --- /dev/null +++ b/cmd/yggdrasilsim/main.go @@ -0,0 +1,6 @@ +package main + +func main() { + store := makeStoreSquareGrid(4) + dialStore(store) +} diff --git a/cmd/yggdrasilsim/node.go b/cmd/yggdrasilsim/node.go new file mode 100644 index 0000000..65e6a80 --- /dev/null +++ b/cmd/yggdrasilsim/node.go @@ -0,0 +1,28 @@ +package main + +import ( + "io/ioutil" + + "github.com/gologme/log" + + "github.com/yggdrasil-network/yggdrasil-go/src/config" + "github.com/yggdrasil-network/yggdrasil-go/src/crypto" + "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" +) + +type simNode struct { + core yggdrasil.Core + id int + nodeID crypto.NodeID + dialer *yggdrasil.Dialer + listener *yggdrasil.Listener +} + +func newNode(id int) *simNode { + n := simNode{id: id} + n.core.Start(config.GenerateConfig(), log.New(ioutil.Discard, "", 0)) + n.nodeID = *n.core.NodeID() + n.dialer, _ = n.core.ConnDialer() + n.listener, _ = n.core.ConnListen() + return &n +} diff --git a/cmd/yggdrasilsim/store.go b/cmd/yggdrasilsim/store.go new file mode 100644 index 0000000..6fce81a --- /dev/null +++ b/cmd/yggdrasilsim/store.go @@ -0,0 +1,41 @@ +package main + +type nodeStore map[int]*simNode + +func makeStoreSingle() nodeStore { + s := make(nodeStore) + s[0] = newNode(0) + return s +} + +func linkNodes(a *simNode, b *simNode) { + la := a.core.NewSimlink() + lb := b.core.NewSimlink() + la.SetDestination(lb) + lb.SetDestination(la) + la.Start() + lb.Start() +} + +func makeStoreSquareGrid(sideLength int) nodeStore { + store := make(nodeStore) + nNodes := sideLength * sideLength + idxs := make([]int, 0, nNodes) + // TODO shuffle nodeIDs + for idx := 1; idx <= nNodes; idx++ { + idxs = append(idxs, idx) + } + for _, idx := range idxs { + n := newNode(idx) + store[idx] = n + } + for idx := 0; idx < nNodes; idx++ { + if (idx % sideLength) != 0 { + linkNodes(store[idxs[idx]], store[idxs[idx-1]]) + } + if idx >= sideLength { + linkNodes(store[idxs[idx]], store[idxs[idx-sideLength]]) + } + } + return store +} diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 733b9ac..7f6b9b5 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -217,9 +217,23 @@ func (intf *linkInterface) handler() error { intf.link.mutex.Unlock() // Create peer shared := crypto.GetSharedKey(myLinkPriv, &meta.link) + out := func(msgs [][]byte) { + // nil to prevent it from blocking if the link is somehow frozen + // this is safe because another packet won't be sent until the link notifies + // the peer that it's ready for one + intf.writer.sendFrom(nil, msgs, false) + } + linkOut := func(bs []byte) { + // nil to prevent it from blocking if the link is somehow frozen + // FIXME this is hypothetically not safe, the peer shouldn't be sending + // additional packets until this one finishes, otherwise this could leak + // memory if writing happens slower than link packets are generated... + // that seems unlikely, so it's a lesser evil than deadlocking for now + intf.writer.sendFrom(nil, [][]byte{bs}, true) + } phony.Block(&intf.link.core.peers, func() { // FIXME don't use phony.Block, it's bad practice, even if it's safe here - intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }) + intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }, out, linkOut) }) if intf.peer == nil { return errors.New("failed to create peer") @@ -228,20 +242,6 @@ func (intf *linkInterface) handler() error { // More cleanup can go here intf.peer.Act(nil, intf.peer._removeSelf) }() - intf.peer.out = func(msgs [][]byte) { - // nil to prevent it from blocking if the link is somehow frozen - // this is safe because another packet won't be sent until the link notifies - // the peer that it's ready for one - intf.writer.sendFrom(nil, msgs, false) - } - intf.peer.linkOut = func(bs []byte) { - // nil to prevent it from blocking if the link is somehow frozen - // FIXME this is hypothetically not safe, the peer shouldn't be sending - // additional packets until this one finishes, otherwise this could leak - // memory if writing happens slower than link packets are generated... - // that seems unlikely, so it's a lesser evil than deadlocking for now - intf.writer.sendFrom(nil, [][]byte{bs}, true) - } themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 7eef9a1..801691a 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -123,7 +123,7 @@ func (ps *peers) _updatePeers() { } // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. -func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func()) *peer { +func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func(), out func([][]byte), linkOut func([]byte)) *peer { now := time.Now() p := peer{box: *box, sig: *sig, @@ -134,6 +134,8 @@ func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShar close: closer, core: ps.core, intf: intf, + out: out, + linkOut: linkOut, } oldPorts := ps.ports newPorts := make(map[switchPort]*peer) diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 71d9260..1bb14c4 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -62,17 +62,17 @@ func (r *router) init(core *Core) { }, } var p *peer - phony.Block(&r.core.peers, func() { - // FIXME don't block here! - p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) - }) - p.out = func(packets [][]byte) { + peerOut := func(packets [][]byte) { r.handlePackets(p, packets) r.Act(p, func() { // after the router handle the packets, notify the peer that it's ready for more p.Act(r, p._handleIdle) }) } + phony.Block(&r.core.peers, func() { + // FIXME don't block here! + p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil, peerOut, nil) + }) p.Act(r, p._handleIdle) r.out = func(bs []byte) { p.handlePacketFrom(r, bs) } r.nodeinfo.init(r.core) diff --git a/src/yggdrasil/simlink.go b/src/yggdrasil/simlink.go new file mode 100644 index 0000000..f830c21 --- /dev/null +++ b/src/yggdrasil/simlink.go @@ -0,0 +1,91 @@ +package yggdrasil + +import ( + "errors" + "github.com/Arceliar/phony" +) + +type Simlink struct { + phony.Inbox + rch chan []byte + dest *Simlink + link *linkInterface + started bool +} + +func (s *Simlink) readMsg() ([]byte, error) { + bs, ok := <-s.rch + if !ok { + return nil, errors.New("read from closed Simlink") + } + return bs, nil +} + +func (s *Simlink) _recvMetaBytes() ([]byte, error) { + return s.readMsg() +} + +func (s *Simlink) _sendMetaBytes(bs []byte) error { + _, err := s.writeMsgs([][]byte{bs}) + return err +} + +func (s *Simlink) close() error { + defer func() { recover() }() + close(s.rch) + return nil +} + +func (s *Simlink) writeMsgs(msgs [][]byte) (int, error) { + if s.dest == nil { + return 0, errors.New("write to unpaired Simlink") + } + var size int + for _, msg := range msgs { + size += len(msg) + bs := append([]byte(nil), msg...) + phony.Block(s, func() { + s.dest.Act(s, func() { + defer func() { recover() }() + s.dest.rch <- bs + }) + }) + } + return size, nil +} + +func (c *Core) NewSimlink() *Simlink { + s := &Simlink{rch: make(chan []byte, 1)} + n := "Simlink" + var err error + s.link, err = c.link.create(s, n, n, n, n, false, true) + if err != nil { + panic(err) + } + return s +} + +func (s *Simlink) SetDestination(dest *Simlink) error { + var err error + phony.Block(s, func() { + if s.dest != nil { + err = errors.New("destination already set") + } else { + s.dest = dest + } + }) + return err +} + +func (s *Simlink) Start() error { + var err error + phony.Block(s, func() { + if s.started { + err = errors.New("already started") + } else { + s.started = true + go s.link.handler() + } + }) + return err +}