1

Closed

Issue with DHT groups hanging on Join

description

I may be misunderstanding the way I need to configure DHT, but I appear to be able to get it to hang when trying to add replication. Using the code below, if I start multiple processes then the second and third ones hang on joining the group, and subsequent ones hang on starting ISIS. If I change the DHT parameters to (1, 1, 1) then it works fine (but obviously there's no replication). Am I missing a step?

Thanks,
Ben
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using Isis;

using Timeout = Isis.Timeout;

namespace ISIS2
{
    delegate void myLhandler(string who);

    internal class Program
    {
        public static Timeout myTO = new Timeout(1000, Timeout.TO_FAILURE);

        public static EOLMarker myEOL = new EOLMarker();

        private static int LOOKUP = 0;

        private static void Main(string[] args)
        {
            Console.WriteLine("Starting");
            IsisSystem.Start();
            Console.WriteLine("Started");

            // First create a group that includes just a single handler
            Group myGroup = new Group("some name");
            myGroup.DHTEnable(2, 4, 2);

            int count = 0;
            myGroup.ViewHandlers += (ViewHandler)(view =>
                {
                    Console.WriteLine("Joined {0}", view);
                    count = view.members.Length;
                });

            Console.WriteLine("Joining");
            myGroup.Join();

            Console.WriteLine("Waiting for enough");
            while (count < 3)
            {
                Thread.Sleep(100);
            }

            Console.WriteLine("Got enough");

            var rand = new Random();
            count = 0;
            var timer = Stopwatch.StartNew();
            while (true)
            {
                var num = rand.Next();
                myGroup.DHTPut(num, num);
                myGroup.DHTGet<int, int>(num);

                ++count;
                if (count % 1000 == 0)
                {
                    Console.WriteLine(count / timer.Elapsed.TotalSeconds);
                }
            }

            IsisSystem.WaitForever();
        }
    }
}
Closed Jun 20, 2015 at 11:59 AM by birman

comments

birman wrote Mar 12, 2015 at 1:15 PM

I actually don't see why that wouldn't work. I'll try it myself later today and then I can get back to you. We do stuff with the DHT all the time, although generally I don't set the minimize group size to smaller than the target group size (that is, I would more normally have used (2, 4, 4) not (2, 4, 2). But I can't think of why that would cause things to hang.

As a debugging suggestion, when a program hangs with Isis2, my recommendation is to have a background thread launched ahead of time and get it to print the string returned by IsisSystem.GetState(). This could itself hang, which would tell me something, but more commonly you get a massive report of the state of the entire system as seen from that particular group member. Then I can usually just glance at it and see why your thread is waiting and what seems to be going on.

For example, here's a little debug program I often use when testing DHT issues (this wants 12 members). Have a look at the first line or two. As you can see, every 2 minutes every group member prints one of these mega status reports to its console, and also to a log file in the logs folder.

I would then normally take that collection of logs and by looking at it, I can tell exactly what the issue is. In fact if you could email me four such logs (cut and paste that first thread start into your code, rerun with 4 group members) I'll be happy to explain how those particular logs can be used to make sense of the state of the system.

Let me add that this could certainly be some kind of Isis2 issue. I haven't been using the DHT for a little while, and perhaps I did something that broke it in the interim. If so it should be pretty trivial to fix...
        internal const int ntuples = 1000;
        internal static int tcount = 0;

        public static void doDHTTest()
        {
            int myfrank = -1;
            Console.SetWindowSize(60, 30);
            int target = 12;
            IsisSystem.Start();
            if (dumper == null)
            {
                dumper = new Thread(delegate() { int n = 0; while (IsisSystem.IsisActive) { Isis.Isis.ISIS_SLEEP.WaitOne(1000); if (++n % 120 == 0) { IsisSystem.WriteAckInfo(); IsisSystem.WriteLine("At time " + Isis.Isis.NOW + "  " + IsisSystem.GetState()); } } });
                dumper.Name = "Dumper";
                dumper.Start();
            }
            IsisSystem.WriteLine("Started");
            Group g = new Group("myDHT");
            g.DHTEnable(2, 12, 12);
            bool go = false;
            int myrank = -1;
            g.ViewHandlers += (ViewHandler)delegate(View v)
            {
                myrank = v.GetMyRank();
                Console.Title = "[" + v.gname + "]:  " + Isis.Isis.my_address + " rank=" + myrank + " in view " + v.viewid;
                if (v.GetSize() >= target)
                {
                    go = true;
                    myfrank = myrank;
                }
                IsisSystem.WriteLine("go=" + go + " with [" + v.gname + "]:  " + Isis.Isis.my_address + " rank=" + myrank + " in view " + v);
            };
            g.Handlers[0] += (Action<int>)delegate(int n)
            {
                IsisSystem.WriteLine("Entry to new parallel Select logic; looking for key%77==" + n + ", my portion of the DHT contains " + g.DHT<int, string>().Count() + " Key,Value pairs");
                IEnumerable<string> newList = g.DHT<int, string>().Where(kvp => kvp.Key % 77 == n).Select(kvp => kvp.Value);
                IsisSystem.WriteLine("After parallel Linq operation, my contribution contains " + newList.Count() + " matching tuples");
            };
            g.Join();
            while (!go)
                Isis.Isis.ISIS_SLEEP.WaitOne(500);
            int start = (IsisSystem.GetMyAddress().pid % 10) * 100000;
            for (int n = start; n < start + ntuples; n++)
                g.DHTPut(n, "<" + (n * 5 % 100).ToString() + ">");
            IsisSystem.WriteLine("After " + ntuples + " DHT put operations, now testing multi-put");
            for (int k = 0; k < 10; k++)
            {
                List<KeyValuePair<int, string>> myList = new List<KeyValuePair<int, string>>();
                for (int n = start + 1000 + 10 * k; n < start + 1000 + ntuples / 4 + 10 * k; n++)
                    myList.Add(new KeyValuePair<int, string>(n, "<M<" + (n * 5 % 1000).ToString() + ">M>"));
                g.DHTPut(myList);
            }
            IsisSystem.WriteLine("After 10 DHT multi-put operations, now testing Ordered multi-put");
            for (int k = 0; k < 10; k++)
            {
                List<KeyValuePair<int, string>> myList = new List<KeyValuePair<int, string>>();
                for (int n = start + 2000 + 100 * k; n < start + 2000 + ntuples / 4 + 100 * k; n++)
                    myList.Add(new KeyValuePair<int, string>(n, "<MO<" + (n * 5 % 1000).ToString() + ">MO>"));
                g.DHTOrderedPut(myList);
            }
            IsisSystem.WriteLine("After 10 DHT multi-put operations, now testing Search");
            while ((g.flags & Group.G_TERMINATING) == 0 && g.GroupOpen)
            {
                List<KeyValuePair<int, string>> result = null;
                IsisSystem.WriteLine("Doing 3 pt-to-pt get operations");
                foreach (var s in new List<int> { start, start + 1000, start + 2000 })
                    if ((g.flags & Group.G_TERMINATING) == 0 && g.GroupOpen)
                        IsisSystem.WriteLine("     DHTGet(" + s + ") returns " + g.DHTOrderedGet<int, string>(s));
                IsisSystem.WriteLine("Now doing 1000 3-Get operation");
                for (int count = 0; count < 500 && ((g.flags & Group.G_TERMINATING) == 0) && g.GroupOpen; count++)
                {
                    ++tcount;
                    long stime = Isis.Isis.NOW;
                    result = g.DHTOrderedGet<int, string>(new List<int>() { start + count + myrank * 10, start + 1000 + count + myrank * 15, start + 2000 + count + myrank * 25 });
                    Isis.Isis.WriteLine("DHTOrderedGet[" + count + "]: N=" + result.Count() + ", dt=" + (Isis.Isis.NOW - stime));
                }
                if (myfrank >= 12 && myfrank <= 14)
                {
                    IsisSystem.WriteLine("Gracefully exiting"); Isis.Isis.ISIS_SLEEP.WaitOne(1000);
                    g.Leave();
                    IsisSystem.Shutdown();
                    Environment.Exit(0);
                }
                if (tcount >= 175 && myfrank == 11)
                {
                    IsisSystem.WriteLine("Terminate the DHT group");
                    g.Terminate();
                }
                foreach (var kvp in result)
                    IsisSystem.WriteLine("     DHTGet(" + kvp.Key + ") returns " + kvp.Value);
                IsisSystem.WriteLine("Doing 5 parallel search operations");
                for (int n = 0; n < 5 && ((g.flags & Group.G_TERMINATING) == 0) && g.GroupOpen; n++)
                    g.OrderedSend(0, n);
            }
            IsisSystem.WriteLine("*********** Finished in DHT test **************");
        }

bencyoung wrote Mar 12, 2015 at 4:02 PM

Ok, I'm just trying to get codeplex to let me upload the logs. Putting a breakpoint on joining hanging, it's at this line:
            if ((g.flags & G_NEEDSTATEXFER) != 0)
            {
                ILock.NoteThreadState("xferWait.WaitOne()");
                g.xferWait.WaitOne();  -- Here
                ILock.NoteThreadState(null);
                g.xferWait.Release();
            }

bencyoung wrote Mar 12, 2015 at 4:04 PM

Ok, uploading isn't happy for some reason, so pasting them here!

Log1:

Starting
Started
Joining
Joined View[gname=<some name>, gaddr=(0:224.0.76.209/0:0), viewid=0; 1 members={ [(107636)]}, nextIn={ <0:0> 0:0 }, minStable=-1 (last sent -1), StableTo={ ** 0 }, joining={ [(107636)]}, leaving={ []}, IamLeader = True
Waiting for enough
Joined View[gname=<some name>, gaddr=(0:224.0.76.209/0:0), viewid=1; 2 members={ [(107636)(106972)]}, nextIn={ <1:0> 1:0 1:0 }, minStable=-1 (last sent -1), StableTo={ ** 0 0 }, joining={ [(106972)]}, leaving={ []}, IamLeader = True
At time 47308
----------------------- ISIS state for (107636:10.254.215.11/49157:49158) [leaderId=0]:
Summary of network statistics:
SENT: 44 UDP (17760 non-duplicated bytes), 0 tokens, 44 IPMC (30152 bytes; 12 were stability packets), 66 Acks, 0 Nacks, 11 Discards.
RECV: 55 UDP (11 were dups; 19256 bytes), 0 tokens, 55 IPMC (37340 bytes; 14 were stability packets), 11 were dups, 117 Acks, 0 Nacks, 0 token-triggered resends
GROUPS:
group <ISISMEMBERS>... gaddr (0:224.0.191.199/0:0), IP address [VirtIP: 224.0.191.199, PhysIP: 224.0.19.138]
View[gname=<ISISMEMBERS>, gaddr=(0:224.0.191.199/0:0), viewid=2;  3 members={ [(107636)(106972)(82092)]}, nextIn={  <2:0> 2:1 2:0 2:0 }, minStable=0 (last sent -1), StableTo={ ** 0 0 0 }, joining={ [(82092)]}, leaving={ []}, IamLeader = True
My recieved multicast count 14, rate 0, next outgoing msgid 1, Need to send stability info (last sent at 19.191; MaxBacklogSent=0)
     AGGREGATION STATE:
        Aggregator IsisAggregator<System.Int32, Isis.MCMDSocket+GRPair[]> level 2
        Thread MCMD Remapping task is waiting for Aggregation Result(Key=1)

group <some name>... gaddr (0:224.0.76.209/0:0){ wedged }, IP address [VirtIP: 224.0.76.209, PhysIP: 224.0.19.138]
View[gname=<some name>, gaddr=(0:224.0.76.209/0:0), viewid=1;  2 members={ [(107636)(106972)]}, nextIn={  <1:1> 1:0 1:0 }, minStable=-1 (last sent -1), StableTo={ ** 0 0 }, joining={ [(106972)]}, leaving={ []}, IamLeader = True
My recieved multicast count 3, rate 0, next outgoing msgid 0, Don't need to send stability info (last sent at 0.000; MaxBacklogSent=0)

MEMBERSHIP ORACLE:
group <ORACLE>... gaddr (0:224.0.19.136/0:0), IP address [VirtIP: 224.0.19.136, PhysIP: 224.0.19.136]
View[gname=<ORACLE>, gaddr=(0:224.0.19.136/0:0), viewid=2;  3 members={ [(107636)(106972)(127280)]}, nextIn={  <2:0> 2:10 2:0 2:1 }, minStable=9 (last sent 9), StableTo={ ** 9 0 0 }, joining={ [(127280)]}, leaving={ []}, IamLeader = True
My recieved multicast count 70, rate 0, next outgoing msgid 10, Don't need to send stability info (last sent at 21.191; MaxBacklogSent=0)
TRACKING PROXIES:
   <ISISMEMBERS/(0:224.0.191.199/0:0)/VUP> [VIP: 224.0.191.199, PIP: 224.0.19.138]  vid 2: next mid=2:0, rate 0,  {[(107636)(106972)(82092)]}
   <some name/(0:224.0.76.209/0:0)/VUP> [VIP: 224.0.76.209, PIP: 224.0.19.138]  vid 1: next mid=1:1, rate 0,  {[(107636)(106972)]}
PROPOSED VIEW DELTAS:
     LeaderId=0, Group <some name> (0:224.0.76.209/0:0) (mmap 224.0.76.209:224.0.19.138), isLarge=False, prevVid 1, final msg counts: { 0  0 }, Joining: {[(82092)]}, Leaving: {[]}
Timer State: NOW = 47.326...
Heard from recently: {(107636)@46.196 (127280)@46.198 (106972)@46.716 (82092)@40.157 }
RIPList = []; Group RIPList = []
Group view events list:
Action[8]: JOIN (127280) on groups { <ISISMEMBERS>}, gaddrs {[(0:224.0.191.199/0:0)]}
AwaitReplies State:
Rendezvous[10]: <some name>, using lock [reply-wait with ilock barrier:0], 1:0, tid = 15 Need 1 additional replies (got 1 replies from <(107636)>, waiting for replies from <(106972)>), reply wait-id 0
Flow Control: 0+0/1 + 0 outgoing/unstable/on-todo-list/etc, 0 on P2P or callback queues, 0 backlog, 0 full RecvBB slots, 0 undelivered, 0 in UDP tunnel, 0 in IPMC tunnel, remote backlog=0 (limit=100)
          0 threads waiting on local congestion, 0 waiting on remote congestion
Pending Send Buffer:
Large-Group Pending Send Buffer:
P2PSequencer states
-- P2PS [(127280)], inseqn 13, outseqn 16, non-raw outseqn 16, backlog 0, last callback @ 45.498, remote backlog=0
-- P2PS [(106972)], inseqn 40, outseqn 18, non-raw outseqn 18, backlog 0, last callback @ 46.716, remote backlog=0
-- P2PS [(107636)], inseqn 15, outseqn 15, non-raw outseqn 0, backlog 0, last callback @ 0.000, remote backlog=0
-- P2PS [(82092)], inseqn 2, outseqn 7, non-raw outseqn 7, backlog 0, last callback @ 0.000, remote backlog=0
MCMD Mapping:
Group [ISISMEMBERS]: rate = 0    VirtIPAddr=224.0.191.199, mapped to 224.0.19.138
Group [some name]: rate = 0    VirtIPAddr=224.0.76.209, mapped to 224.0.19.138
Group [ORACLE]: rate = 0    VirtIPAddr=224.0.19.136, mapped to 224.0.19.136
MCMD VIRTUAL SLOTS
Virtual socket[0]: (224.0.19.136:224.0.19.136), physical-slot [0], backlog 0
Virtual socket[1]: (224.0.191.199:224.0.19.138), physical-slot [1], backlog 0
Virtual socket[2]: (224.0.76.209:224.0.19.138), physical-slot [1], backlog 0
MCMD PHYSICAL SLOTS
Physical socket[0]: 224.0.19.136, refcount=1
in-thread <ISIS MCMD: receive-on 224.0.19.136>
out-thread <ISIS MCMD: send-to 224.0.19.136>, backlog 0
Physical socket[1]: 224.0.19.138, refcount=2
in-thread <ISIS MCMD: receive-on 224.0.19.138>
out-thread <ISIS MCMD: send-to 224.0.19.138>, backlog 0
THREAD WAITS:
Thread <Dedicated P2P receiver thread>: P2P.Receive
Thread <Isis token-loop thread>: Sleep(40)
Thread <Isis All-Groups HeartBeat thread>: Sleep(30000)
Thread <Isis ack-socket socket reader>: Ack.Receive
Thread <Isis <ORACLE> HeartBeat thread>: Sleep(1000)
LOCKED OBJECTS: NOT TRACKING DETAILED USE
THREAD ILOCK STATE:
Thead[*](Ack-processing thread) is waiting to remove an object from AckBB
Thead[*](Receive-processing thread) is waiting to remove an object on RecvBB
Thread[0] (Isis p2p-socket socket reader): Waiting for [p2pB:1]
Thread[1] (Ack processing thread): Waiting for [ackb:1]
Thread[2] (<ORACLE> incoming p2p delivery thread): Waiting for [delivery:3]
Thread[3] (<ORACLE> incoming multicasts delivery thread): Waiting for [delivery:1]
Thread[4] (ISIS MCMD: send-to 224.0.19.136): Waiting for [bb-out:1]
Thread[5] (Isis <ORACLE> incoming thread): Waiting for [bb-in:1]
Thread[11] (<ISISMEMBERS> incoming p2p delivery thread): Waiting for [delivery:7]
Thread[12] (<ISISMEMBERS> incoming multicasts delivery thread): Waiting for [delivery:5]
Thread[13] (IM_IPMCViewCast_TUNNEL thread): Waiting for [ipmc:1]
Thread[14] (Loopback thread): Waiting for [barrier:1]
Thread[17] (ISIS MultiQuery thread for <some name>): Waiting for [reply-wait with ilock barrier:0]; while watching Group (0:224.0.76.209/0:0), members [(107636)(106972)]
Thread[21] (ISIS MCMD: send-to 224.0.19.138): Waiting for [bb-out:3]
Thread[22] (Isis <ISISMEMBERS> incoming thread): Waiting for [bb-in:3]
Thread[23] (<some name> incoming p2p delivery thread): Waiting for [delivery:11]
Thread[24] (<some name> incoming multicasts delivery thread): Waiting for [delivery:9]
Thread[28] (Isis <some name> incoming thread): Waiting for [bb-in:5]
SEMAPHORE STATE:
(barrier) [reply-wait with ilock barrier:0], Nwaiters = 1
(bounded buffer) [barrier:1], Nwaiters = 1
(bounded buffer) [bb-out:1], Nwaiters = 1
(bounded buffer) [bb-out:3], Nwaiters = 1
(bounded buffer) [bb-in:1], Nwaiters = 1
(bounded buffer) [bb-in:3], Nwaiters = 1
(bounded buffer) [bb-in:5], Nwaiters = 1
[ackb:1], Nwaiters = 1
[p2pB:1], Nwaiters = 1
[ipmc:1], Nwaiters = 1
[delivery:1], Nwaiters = 1
[delivery:3], Nwaiters = 1
[delivery:5], Nwaiters = 1
[delivery:7], Nwaiters = 1
[delivery:9], Nwaiters = 1
[delivery:11], Nwaiters = 1
BOUNDED BUFFERS:
<BB:P2P> size=1024 (0 full), GetLock=[p2pB:1], PutLock=[p2pB:0]
<BB:Ack> size=512 (0 full), GetLock=[ackb:1], PutLock=[ackb:0]
<ORACLE:DeliverInOrder(IPMC)> size=5120 (0 full), GetLock=[delivery:1], PutLock=[delivery:0]
<ORACLE:DeliverInOrder(P2P)> size=1024 (0 full), GetLock=[delivery:3], PutLock=[delivery:2]
<ORACLE:MCMD> size=512 (0 full), GetLock=[bb-in:1], PutLock=[bb-in:0]
<MCMD-outgoing:224.0.19.136> size=512 (0 full), GetLock=[bb-out:1], PutLock=[bb-out:0]
<ISISMEMBERS:DeliverInOrder(IPMC)> size=5120 (0 full), GetLock=[delivery:5], PutLock=[delivery:4]
<ISISMEMBERS:DeliverInOrder(P2P)> size=1024 (0 full), GetLock=[delivery:7], PutLock=[delivery:6]
<IPMC:BB> size=256 (0 full), GetLock=[ipmc:1], PutLock=[ipmc:0]
<Loopback> size=1024 (0 full), GetLock=[barrier:1], PutLock=[barrier:0]
<ISISMEMBERS:MCMD> size=512 (0 full), GetLock=[bb-in:3], PutLock=[bb-in:2]
<MCMD-outgoing:224.0.19.138> size=512 (0 full), GetLock=[bb-out:3], PutLock=[bb-out:2]
<some name:DeliverInOrder(IPMC)> size=5120 (0 full), GetLock=[delivery:9], PutLock=[delivery:8]
<some name:DeliverInOrder(P2P)> size=1024 (0 full), GetLock=[delivery:11], PutLock=[delivery:10]
<some name:MCMD> size=512 (0 full), GetLock=[bb-in:5], PutLock=[bb-in:4]
-------------------------End of State Dump------------------------------------

At time 77342
----------------------- ISIS state for (107636:10.254.215.11/49157:49158) [leaderId=0]:
Summary of network statistics:
SENT: 66 UDP (24272 non-duplicated bytes), 0 tokens, 47 IPMC (31844 bytes; 12 were stability packets), 108 Acks, 0 Nacks, 17 Discards.
RECV: 97 UDP (17 were dups; 31688 bytes), 0 tokens, 58 IPMC (39032 bytes; 14 were stability packets), 17 were dups, 145 Acks, 0 Nacks, 0 token-triggered resends
GROUPS:
group <ISISMEMBERS>... gaddr (0:224.0.191.199/0:0), IP address [VirtIP: 224.0.191.199, PhysIP: 224.0.19.138]
View[gname=<ISISMEMBERS>, gaddr=(0:224.0.191.199/0:0), viewid=2;  3 members={ [(107636)(106972)(82092)]}, nextIn={  <2:0> 2:1 2:0

bencyoung wrote Mar 12, 2015 at 4:04 PM

Oh, that didn't work!

birman wrote Mar 12, 2015 at 6:09 PM

Ben, in these logs the group never reaches 4 members. In the log you posted above (misformatted by the codeplex system) it hits 3 but doesn't make it to 4. In the logs you sent me, it looks like you actually ended up with 2 partitioned Isis groups.

Could it be that you've ended up with Isis partitioned because of a firewall rule?

This would definitely explain your issue: because of the loop in your code, in this particular case the wait is occurring in your code. You might want to just try running 4 copies on one machine.

But in fact this raises another point: if you set up the DHT with (2,4,2) this says shards should be of size 2, hence with just 2 members, you'll be running with one shard and one "empty" shard. If a key maps to the empty shard, the system would throw a "depopulated shard" exception.

So you want (2,4,4) in any case.

birman wrote Mar 12, 2015 at 6:10 PM

PS: The group you created is listed in the log and you can actually watch its membership changing, although of course here you see it every 2 minutes. But the deal is that the system-defined "toString" method for a group view does show the group name, address, view-id and member list... so you can actually see who is playing which role, etc.

birman wrote Mar 12, 2015 at 9:58 PM

OK, mea culpa! Something indeed is broken. I've reproduced this and will post a patched version shortly (in fact this was caused by a change in something that we did for a completely different purpose, but seems to have disrupted the start sequence in a way that causes your DHT application to hang).

birman wrote Mar 13, 2015 at 2:25 PM

Attached is a workaround. The issue was very specific to the DHT and related to some code that was supposed to do highly parallel state transfer (imagine a DHT with 5000 members, shared into 3 or 4 node subgroups, and now imagine that 75 new members join while 50 or so leave). My kind of thing, and the code in question was very distributed and rather cool -- a kind of N to M shuffle. I think it works.

But in fact it clearly had at least one issue, related to groups that are below the minimum specified size. I need to look more closely to see exactly what the real issue is. This one-line patch works around it, but I think a different fix is probably needed (this hack could cause a big surge load if you had 5000 members and a minimum size of 4900 and then the group dipped below 4900 briefly -- the right fix would still be highly parallel on recovery, but this hack would kind of DDoS the joiners in a surge load way, which isn't ideal). So on Monday I will probably upload a different fix, after finding the < to change to a <= and then retesting really carefully.

birman wrote Mar 16, 2015 at 12:39 PM

I deleted the attachment and am uploading the proper fix now as a patch to our main release.

bencyoung wrote Mar 17, 2015 at 10:12 AM

Thanks. I can confirm the initial fix worked and I'll test the official change soon