Today we are going to look at loading large datasets in a asynchronous and distributed fashion. In a lot of circumstances it is best to work with such datasets in an entirely distributed fashion, but for this demonstration we will be assuming that that is not possible, because you need to Channel it into some serial process. But it doesn’t have to be the case. Anyway, we use this to further introduce Channels and RemoteChannels. I have blogged about Channels before, you make wish to skim that first. That article focused on single producer single consumer. This post will focus on multiple producers, single consumer. (though you’ll probably be able to workout multiple consumers from there, it is pretty semetrical).

This is an expanded explination of some code I originally posted this on the Julia Discourse. I find the documentation on channels took me a bit to get my head around for this, so I hope this extra example helps you to understand

Input:

So let’s take a look at one of these files. We are using CodecZlib to decompress the gzip while reading the stream from. Decompressing gzip this way doesn’t add much overhead, thus when it isn’t a gzipped tarball I don;t see much reason to extract it. We’ll just read off the first line.

Input:

Output:

So it is clear that each line of these files is a JSON entry representing a different paper. We are going to extract the actual key information into a julia structure. Including tokenizing the abstract.

Input:

We define a loading function that takes file-name to load and a channel to output to. Like in the earlier post, we are going to create a channel, and load some of the data into it. This dataset is huge, so using a channel to lazily load it is important. Otherwise we’ll use all our RAM for no real gain. Loading items you never read is not useful.

It will parse the JSON, and load-up the data. skipping any papers with no author or abstract.

Input:

Ok, so lets see how that goes.

Input:

Output:

3-element Array{Paper,1}:
Paper("Molecular cloning, sequencing, and expression of a cDNA encoding alpha-glucosidase from Mucor javanicus.", String["M Sugimoto", "Y Suzuki"], 1996, String["A", "cDNA", "encoding", "Mucor", "javanicus", "alpha-glucosidase", "was", "cloned", "and", "sequenced"  …  "hydrolyzes", "not", "only", "maltose", ",", "but", "also", "soluble", "starch", "."])
Paper("An intensive motor skills treatment program for children with cerebral palsy.", String["Wende Oberg", "Barbara Grams", "Judith Gooch"], 2009, String["This", "article", "describes", "the", "development", "and", "efficacy", "of", "the", "Intensive"  …  "of", "assistance", "on", "short-term", "objectives", "as", "the", "program", "progressed", "."])
Paper("Treatment of retinopathy of prematurity with argon laser photocoagulation.", String["M B Landers", "C A Toth", "H C Semple", "L S Morse"], 1992, String["Fifteen", "eyes", "of", "nine", "infants", "were", "treated", "for", "retinopathy", "of"  …  ".", "No", "intraocular", "hemorrhages", "occurred", "during", "any", "laser", "treatment", "."])


To baseline this, I am going to define a function that loads all the files into a shared channel, in serial. One after the other.

Input:

Output:

serial_channel_load (generic function with 1 method)


And I’ll define a testing harness

Input:

Output:

test_loading (generic function with 1 method)


Input:

Output:

In theory, loading files Asynchronously should be faster than serial. See Wikipedia. This is on the basis that while the disk is seaking to the file I want to read, julia cans switch the CPU over to another task which is wanting to do some computation. Like parsing JSON.

Just to be clear async isn’t true parallelism, it is just swapping Tasks when one is blocked/yields. Only one bit of julia code runs at a time.

Input:

Output:

async_channel_load (generic function with 1 method)


The @async starts the new tasks for each file. The @sync needed to avoid exiting this loop before all the contained @asyncs are done. The Channel does itself starts a task, to run that do block which starts up all the other tasks. So async_channel_load actually returns the channel almost immediately. Most of the loading will happen as we try to read from the channel.

This is fairly simply now that it is in front of me. But working out how to close the channel only once all the feeders was done took me a little bit. The channel need to be closed once they are all done because if the channel is left open the the consumer can’t tell that there is nothing left. (and so you can’t use it with a for loop directly, only with take).

Input:

Output:

You can see that we are not actually getting any performance gain here. Infact, we are getting a small loss, though I am fairly sure that is below significance as other tiems I’ve ran this we get a small improvement. I theorize that if we were reading from tens of thousands of small files, rather than 40 large ones we would see a bigger difference, as in that case the hard-drive spends a lot more time moving around the disk. But since it actually spends most of its time reading consecutive entries into the cannels buffer rather than seeking, the async doesn’t do much.

It is worth noting that because we are running Iterators.take to take a finite number of elements from the tasks, that we are potentially accumulating zombie tasks. However, I believe they should all be being caught by their finalizers ones all channels they are pointing to are garbage collected. It might well be better to be closing them explictly, but I have a lot of RAM. (thanks Spencer Russell for suggesting I point this out)

This is more fiddly. But it is true parallelism. If there is solid computation to be done on each item, and the time taken to do the inter-process communication isn’t larger than the processing time, then this is going to be faster. If not all records are being returned (e.g some are skipped) then we know the interprocess communication time is going to be lower than the time to read from disk (since less data is being read).

With that said we are still IO bound, since the processes do all have to share the same physical hard-disk (or RAID array in this case)

Input:

Output:

distributed_channel_load (generic function with 1 method)


This is a bit more complex. But actually not a lot once you’ve gronked the async version. We wrap our local Channel into a RemoteChannel which can be sent to the workers (via closure) effectively. We use a CachingPool, so that we only send fileload_func to each worker once (in case it is a big closure carrying a lot of data), and we clear! it to free that memory back up on the workers at the end.

Rather than a loop of @async we have a map of remotecalls. (this could be done a few other ways, including a using @spawn, but using that macro it doesn’t seem to give much that remotecall doesn’t, and macros are intrinsically harder to reason about than functions (since they could be doing anything)) The remotecalls each return a Future that will hit the ready state once the remotecall has completed. So rather than having an @sync to wait for all the enclosed @syncs, we instead do a loop over the file_done So we loop over waiting for them, to make sure we don’t close the channel before they are all done. The loop of waits takes the place of the @sync.

Input:

Output:

Now, that is cooking with gas.

I hope this has taught you a thing or two about asynchronous and distributed programming in julia. It would be nice to have a threaded version of this to go with it; but making a threaded version of this is really annoying, at least until the PATR work is complete. I am very excited about that PR; though it is a fair way off (julia 1.x time frame). Eventually (though maybe not in that PR), there is a good chance the threaded version of this code would look just like the asynchronous version.