Thread Parallelism in Julia

Julia has 3 kinds of parallelism. The well known, safe, slowish and easyish, distributed parallelism, via pmap, @spawn and @remotecall. The wellish known, very safe, very easy, not-actually-parallelism, asynchronous parallelism via @async. And the more obscure, less documented, experimental, really unsafe, shared memory parallelism via @threads. It is the last we are going to talk about today.

I’m not sure if I can actually teach someone how to write threaded code. Let alone efficient threaded code. But this is me giving it a shot. The example here is going to be fairly complex. For a much simpler example of use, on a problem that is more easily parallelizable, see my recent stackoverflow post on parallelizing sorting.

(Spoilers: in the end I don’t manage to extract any serious performance gains from parallelizing this prime search. Unlike parallelizing that sorting. Paralising sorting worked out great)

In a previous post, I used prime generation as an example to motivate the use of coroutines as generators. Now coroutines are neither parallelism, nor fast. Lets see how fast we can go if we want to crank it up using Base.Threading. (answer: not as much as you might hope).

I feel that julia threading is a bit nerfed. In that all threading must take place in a for-loop, where work is distributed equally to all threads. And the loop end blocks til all threads are done. You can’t just fire off one thread to do a thing and then let it go. I spent some time a while ago trying to workout how to do that, and in short I found that end of thread block is hard to get around. @async on its own can’t break out of it. Though one could rewrite ones whole program to never actually exit that loop. But then one ends up building own own threading system. And I have a thesis to finish.

Primes

This is the same paragraph from that earlier post. I’ll let you know now, this is not an optimal prime number finding algorithm, by any means. We’re just using it for demonstration. It has a good kind of complexity for talking about shared memory parallelism.

If a number is prime, then no prime (except the number itself), will divide it. Since if it has a divisor that is non-prime, then that divisor itself, will have a prime divisor that will divide the whole. So we only need to check primes as candidate divisors. Further: one does not need to check divisibility by all prior primes in order to check if a number $s$ is prime. One only needs to check divisibility by the primes less than or equal to $\sqrt{x}$, since if $x=a \times b$, for some $a>\sqrt{x}$ that would imply that $b<\sqrt{x}$, and so its composite nature would have been found when $b$ was checked as a divisor.

Here is the channel code for before:

Input:

using Base.Test
using Primes

Input:

function primes_ch(T=BigInt)
    known_primes = T[2]
    Channel(csize=256, ctype=T) do c
        x = big"3"
        put!(c, 2) # Output the first prime, as we already put int in the list of known primes
        while true
            for p in known_primes
                if p > sqrt(x)
                    # Must be prime as we have not found any divisor
                    push!(known_primes, x)
                    put!(c, x)
                    break
                end
                if x % p == 0 # p divides
                    # not prime
                    break
                end
            end
            x+=1            
        end
    end
end

Output:

primes_ch (generic function with 2 methods)

Input:

@time collect(Iterators.take(primes_ch(UInt), 10^4));
@time collect(Iterators.take(primes_ch(BigInt), 10^4));

Output:

  1.076319 seconds (6.91 M allocations: 258.664 MiB, 20.54% gc time)
  1.139354 seconds (6.10 M allocations: 240.307 MiB, 17.69% gc time)

Primes Array

So the first and obvious thing to do is to switch to doing this eagerly with an array.

Input:

function primes_array(n,T=UInt)
    known_primes = T[2]
    sizehint!(known_primes, n)
    
    x=T(3)
    while true
        for p in known_primes
            if p > sqrt(x)
                # Must be prime as we have not found any divisor
                push!(known_primes, x)
                break
            end
            if x % p == 0 # p divides
                # not prime
                break
            end
        end
        x+=1
        length(known_primes) == n && break
    end
    return known_primes
end

Output:

primes_array (generic function with 2 methods)

Input:

@time primes_array(10^4, UInt);
@time primes_array(10^4, BigInt);

Output:

  0.188296 seconds (1.70 M allocations: 26.084 MiB, 11.46% gc time)
  1.356425 seconds (5.89 M allocations: 231.466 MiB, 15.92% gc time)

Input:

@time primes_array(10^5, Int);

Output:

  3.627753 seconds (26.43 M allocations: 404.349 MiB, 0.73% gc time)

This gives an improvement, but not as much as we might really hope for. (as you will see below getting more out of it is harder).

Gutting @threads for

the @threads macro eats for-loops and breaks up their ranges equally, one block per thread. That isn’t very practical is your plan does not just a processing of some data that doesn’t depend strongly on the order of processing.

We don’t plan on sequentially processing the data, since breaking all numbers into equal blocks, would result the final thread not being able to do anything until almost all the other threads were done. For this algorithm we need to know all the prime numbers less than $\sqrt{x}$ before we can check if $x$ is prime. So we have a sequential component.

So we gut the @threads macro, taking the core functionality, and we will manage giving work to the threads ourselves.

Input:

using Base.Threads
@show nthreads()

"""
    everythread(fun)
Run `fun` on everythread.
Returns when every instance of `fun` completes
"""
everythread(fun) = ccall(:jl_threading_run, Ref{Void}, (Any,), fun)

Output:

nthreads() = 10
everythread

Just to check it is working:

Input:

called = fill(false, nthreads())
everythread() do 
    called[threadid()] = true
end
called |> showcompact

Output:

Bool[true, true, true, true, true, true, true, true, true, true]

push_sorted!

Before we can get into actually working on the parallelism, we need another part.

Pushing to the end of our list of known_primes is no longer going to guarantee order. One thing we will need is the ability to push! that does maintain order. Because otherwise we could endup thinking we have checked enough factors but actually we skipped over one. (I made that mistake in an earlier version of this code).

We could use a priority queue for this, but since known primes will always be almost sorted, I think it is going to be faster just to insert the elements into a normal vector. Less pointer dereferencing than using a heap.

Input:

"""
inserts `xs` at the last position such that all elements prior to it are smaller than it.
And all after (if any) are larger.
Assumes `xs` is sorted (increasing),
"""
function push_sorted!(xs, y)
	for ii in endof(xs):-1:1
		@inbounds prev = xs[ii]
		if prev < y
			insert!(xs, ii+1, y) # ii+1 is the index in the resulting array for y
			return xs
		end
	end
	# If got to here then y must be the smallest element, so put at start
	unshift!(xs, y)
end

using Base.Test
@testset "push_sorted!" begin
    xs = [1,2,4]
    @test push_sorted!(xs, 5) == [1,2,4,5]
    @test push_sorted!(xs, 3) == [1,2,3,4,5]
    @test push_sorted!(xs, 0) == [0,1,2,3,4,5]
end;

Output:

Test Summary: | Pass  Total
push_sorted!  |    3      3

primes threaded

Ok so here we go with the main content of this piece.

Here is our plan.

So what can go wrong? The most important part of getting share memory parallism correct is making sure at no point is the same piece of memory being both written and read (or written and written). There is no promise that any operation is actually atomic, except atomic operations, and the setting of locks. Which brings me to our two tools for dealing with ensuring that memory is not dual operated on.

Atomic operators are a small set of operations available on primitive types. They run on atomic types. They might not perform quiet the operation you expect (so read the docs). For example atomic_add!(a::Atomic{T}, b::T) updates the value of a, but returns its old value, as type T. Julia’s atomics come out of LLVM, more or less directly.

Then there are locks. These are what you use if you want to make a block of code (which might modify non-primitively typed memory) not run at the same time as some other block of code. Julia has two kinds of locks TatasLock/SpinLock, and Mutex. We’re going to use the first kind, they are based on atomics. The second kind (the Mutex) is based on lib_uv’s OS agnostic wrapper of they OS’s locking system.

So what do we need to restrict:

The other thing we have going on is that we want to sleep our current thread if we are blocked by waiting for a missing prime. This is done using Condition, wait and notify (docs). The advantage of sleeping the thread while it is waiting is that if oversubscribed (or you are doing other things on the computer), any threads currently waiting for a turn on the CPU can get it. I’m not oversubscribing here so it doesn’t really matter. If anything it is slowing it down. Still it is good practice, and makes you a polite multi-threading citizen.

Input:

function primes_threaded(n,T=UInt128)
    known_primes_lock = SpinLock()
    prime_added = Condition()
    known_primes = Vector{T}()
    sizehint!(known_primes, n + nthreads()) #Allocate extra memory inc
    push!(known_primes, 2)
    
    function safe_length_known_primes()
        lock(known_primes_lock) do
            length(known_primes)
        end
    end
    
    function ith_prime(ii) # try and read the ith prime, if it is available. If not theen wait til it is
        while(safe_length_known_primes()<ii)
            wait(prime_added)
        end
        local ret
        lock(known_primes_lock) do
            @inbounds ret = known_primes[ii]
            # we need this lock incase it is being reshuffled
        end
        ret
    end
    
    function add_prime!(p) # Add a prime to our list and let anyone why was waiting for it know 
        lock(known_primes_lock) do
            push_sorted!(known_primes, p)
        end
        notify(prime_added, all=true)
    end

    
    next_check = Atomic{T}(3) # This is the (potentially prime) number the next thread that asked for something to check will et
    everythread() do
        while(true)
            x=atomic_add!(next_check, T(1)) #atomic_add! returns the *old* value befoe the addition
            for ii in 1:x #Not going to get up to this but it will be fine (except at x=2, got to watch that, goot thing we already have 2 covered)
                p = ith_prime(ii) 
                if p > sqrt(x)
                    # Must be prime as we have not found any divisor
                    add_prime!(x)
                    break
                end
                if x % p == 0 # p divides
                    # not prime
                    break
                end
            end

            if safe_length_known_primes() >= n
                return
            end
        end
    end
    return known_primes
end

Output:

primes_threaded (generic function with 2 methods)

Input:

ps = primes_threaded(10^5, Int)
fails = ps[.!isprime.(ps)]

Output:

0-element Array{Int64,1}

Input:

@time primes_threaded(10^4, Int);
@time primes_array(10^4, Int);

Output:

  0.781142 seconds (3.67 M allocations: 68.560 MiB)
  0.475077 seconds (902.99 k allocations: 13.904 MiB, 49.03% gc time)

Wait it is slower?

That is right, this multi-threaded code is much slower that the array code. Getting performance out of multi-threading is hard.

I can’t teach it. But I can show you want I am going to work it out next. My theory is that there is too much lock contention.

Reducing Lock Contention

Working in blocks reduces contention, it also results in more cache-friendly code. Instead of each thread asking for one number to check then checking it, then asking for another, each thread asks for a bunch to check at a time. The obvious contention reduction is with the atomic next_check. The less obvious is in the lock for known_primes which is checked ever time one wants to know how long it is to test if it is time to exit the loop.

In the code that follows, while each thread asks for a block of numbers to check at a time, it reports found primes individually. I looked at having each thread collect them up localizing in a block and then inserting them into the main-list all at once. But I found that actually slowed things down. It mean allocating a lot more memory, and (I guess) the longer consecutive time in which known_primes was locked for the big inserts was problematic. Delaying checks for longer.

The really big cause of contention, I feel is the time to read known_primes. Especially, the earlier elements. The smaller the prime the more likely it is to be a factor. So we would like to at least be able to check these early primes without worrying about getting locks. To do that we need to maintain a separate list of them.

I initially, just wanted to have an atomic value keeping track of up to how far in known_primes, was safe to read, without having to worry about the elements changing. Such that everything was in one array; and we knew which were safe to read. But we can’t do that, because inserting elements can cause the array to reallocate, so requires it to be locked. So we just use a second array.

Input:

function primes_avoid(n,T=UInt128, blocksize=256)
    known_primes_lock = SpinLock()
    prime_added = Condition()
    
    pre_known_primes = primes_array(blocksize, T) # most common factors, stored so we don't need to lock to check them
    #Unfortunately we need to store a separate list, and can't just have a nonmutating part of the mainlist, as even with sizehinting it sometimes deallocates during an insert
    
    known_primes = Vector{T}()
    # Need to initialize it with enough primes that no block is ever waiting for a prime it itself produces   
    sizehint!(known_primes, n + nthreads()*ceil(Int, log(blocksize))) #Allocate extra memory inc
    
    function safe_length_known_primes()
        lock(known_primes_lock) do
            length(known_primes)
        end
    end
    
    function ith_prime(ii) # try and read the ith prime, if it is available. If not theen wait til it is
        local ret
        if ii < length(pre_known_primes)
            @inbounds ret = pre_known_primes[ii] 
        else
            ii-=length(pre_known_primes) # reindex for main list
        
            while(safe_length_known_primes()<ii)
                wait(prime_added)
            end

            lock(known_primes_lock) do
                @inbounds ret = known_primes[ii] 
            end
        end
        ret
    end
       
    function add_prime!(p) # Add a prime to our list and let anyone why was waiting for it know 
        lock(known_primes_lock) do
            push_sorted!(known_primes, p)
        end
        notify(prime_added, all=true)
    end
    
    next_check = Atomic{T}(blocksize + 1) #already checked the first block during initialisation
    everythread() do
    #(f->f())() do
        while(true)
            x_start=atomic_add!(next_check, T(blocksize)) #atomic_add! returns the *old* value befoe the addition
            x_end = x_start + blocksize
            for x in x_start:x_end
            
                for ii in 1:x #Not going to get up to this but it will be fine (except at x=2, got to watch that, goot thing we already have 2 covered)
                    p = ith_prime(ii) 
                    if p > sqrt(x)
                        # Must be prime as we have not found any divisor
                        add_prime!(x)
                        break
                    end
                    if x % p == 0 # p divides
                        # not prime
                        break
                    end
                end
            end
            safe_length_known_primes() + length(pre_known_primes) > n && return
        end
    end
    
    return append!(pre_known_primes, known_primes)
end

Output:

primes_avoid (generic function with 3 methods)

Input:

@time primes_avoid(10^4, Int, 256);
@time primes_array(10^4, Int);

Output:

  0.235902 seconds (889.55 k allocations: 18.294 MiB)
  0.162847 seconds (902.99 k allocations: 13.904 MiB)

Input:

gc()
@time primes_avoid(10^5, Int, 256);
@time primes_array(10^5, Int);

Output:

  6.886030 seconds (20.65 M allocations: 436.509 MiB)
  4.135483 seconds (26.43 M allocations: 404.349 MiB)

So with that we’ve manage to scrape in a bit closers, but we are still losing to the single threaded array.

Flaw in this algorithm when running in parallel

There is actually a flaw in this algorithm, I think. Potentially, if your threads are far enough out of sync, one could be waiting for a prime potential factor, and the prime factor that arrives next, is not actually the next prime; and further more that prime arriving early is larger than $\sqrt{x}$, so terminates the search; incorrectly reporting $x$ as prime. Which if the next prime to arrive was smaller than $\sqrt{x}$ and was a prime factor of $x$, that would make $x$ not a prime.

One solution would be to keep trace of which indices are actually stable. We know an index is stable if it every thread is now working on checking a number that is greater than the prime at that index.

Pretty sure it is super unlikely and never happens, but a fix for it gives me an idea for how to go faster

Working in blocks seriously

Before I said we were working in blocks but we were still pushing everything into a single array at the end. We could actually work in Vector of Vectors, it makes indexing harder but lets us be fine grained with our locks.

So what we are going to do is at the start of each block, we are going to reserve a point in out Vector of Vectors known primes, as well as what numbers we are going to check.

We need to allocate all the block locations at the start, because increasing the size of an array is not threadsafe. A big complication is we don’t know how many blocks we are going to need. It took me a long time to workout the solution to this. What we do is when we run out of allocated memory we let the block of code that is running on all threads terminate, then we allocate more memory and restart it.

This code is pretty complex. As you can see from all the assert statements it took me a fair bit of debugging to get it right. Its still not much (if at all) better than serial. But I think it well illustrates how you have to turn problems around to eak out speed when trying to parallelize them.

Note in particular how reserved_blocks is a vector of atomics indexed by threadid() to keep track of what memory is being held by what thread.

Input:

function primes_blockmore(n,T=UInt128, blocksize=256)
    reserved_blocks = [Atomic{Int}(typemax(T)) for _ in 1:nthreads()]
    reserved_conds = [Condition() for _ in 1:nthreads()]
    
    function inc_prime_pointer(block, ind) # try and read the ith prime, if it is available. If not theen wait til it is       
        #@assert block<minimum(getindex.(reserved_blocks))
        #@assert(block<=safe_length_known_primes[], "1 rbs= $(getindex.(reserved_blocks)), kp=$(safe_length_known_primes[]), block=$(block)")           
        #@assert(isassigned(known_primes, block), "block not assigned $(threadid()) $(block)")
        if length(known_primes[block])>ind
            (block, ind+1)
        else
            #time to move to the next block
            block += 1
            for (owner, rb) in enumerate(reserved_blocks)
                while true
                    # Check to make sure the block we want to read isn't still reserved.
                    if block == rb[]
                        # Technically I think I actually need to synconize here,
                        # against the lock being released in between me looking at it 
                        # and me wanting to wait for it's condition.
                        @inbounds wait(reserved_conds[owner]) #wait til that block is ready
                        break
                    end
                    length((known_primes[block]))>0 & break # skip empty blocks
                end

            end
            #@assert length(known_primes[block])>0
            #@assert block<minimum(getindex.(reserved_blocks))
            #@assert(block<=safe_length_known_primes[], "2 rbs= $(getindex.(reserved_blocks)), kp=$(safe_length_known_primes[]), block=$(block)")
            (block, 1)
        end
    end
      
    reserving_block_lock = SpinLock()
    next_check = blocksize # Not an atomic as we are already protecting it with a lock
    function get_next_block()
        reservation = Vector{T}()
        lock(reserving_block_lock) do
            
            cur_len = true_length_known_primes[]
            out_of_allocation = cur_len == max_true_length_known_primes
            @assert cur_len < max_true_length_known_primes
            if out_of_allocation
                (true, (reservation, -1,-1)) # could be using a nullable here, but I don't want the pointer
            else
                atomic_add!(true_length_known_primes, 1)
                cur_len += 1
                reserved_blocks[threadid()][] = cur_len
                @inbounds known_primes[cur_len] = reservation
                cur_check = next_check            
                next_check+=blocksize
                
                (false, (reservation, cur_check, cur_check + blocksize))
            end
        end
    end
    
    ## Setup initial known_primes
    known_primes = Vector{Vector{T}}(1)
    max_true_length_known_primes = 1;
    @inbounds known_primes[1] = primes_array(blocksize, T)
    
    @inbounds total_known = Atomic{Int}(length(known_primes[1]))
    true_length_known_primes = Atomic{Int}(1) #The number of blocks that are started, exactly
    safe_length_known_primes = Atomic{Int}(1) #The number of blocks that are done, a lower bound
    
    
    blocksize÷=0.5
    while(total_known[] < n) # This outerloop is to add more memory when we run out of blocks, so everything must sync up
        blocksize*=2 # double the size each round as primes are getting rarer.
        @show total_known[], next_check[]
        flush(STDOUT)
        max_true_length_known_primes += n;
        # The upper bound on how many blocks we will allow
        # unfortunately this is more than 1 block per prime
        # without solving for the inverse Prime number theorem it is hard to bound
        # have to preallocate it, AFAICT `push!` is never threadsafe.
        append!(known_primes, Vector{Vector{T}}(max_true_length_known_primes))
        # We are now in a position to reallocated it
        
        everythread() do
        #(f->f())() do #this line is useful instead of everythread for debugging
            while(true)
                (done, (local_primes, x_start, x_end)) = get_next_block()
                done && return # quit now, we are out of allocated memory
                for x in x_start:x_end
                    pp_block, pp_ind = (1, 1)
                    while(true)

                        # if we have an index for it we know it is safe to read
                        @inbounds p = known_primes[pp_block][pp_ind]
                        if p > sqrt(x)
                            # Must be prime as we have not found any divisor
                            push!(local_primes, x)
                            break
                        end
                        if x % p == 0 # p divides
                            # not prime
                            break
                        end
                        pp_block, pp_ind = inc_prime_pointer(pp_block, pp_ind)
                    end
                end

                #End of block stuff.
                @inbounds notify(reserved_conds[threadid()], all=true)
                atomic_add!(safe_length_known_primes, 1)
                @assert length(local_primes) > 0
                atomic_add!(total_known, length(local_primes))
                total_known[] > n && return
            end
        end
    end
    
    known_primes=known_primes[1:true_length_known_primes[]]
    all_primes = T[]
    sizehint!(all_primes, n)
    reduce(append!, T[], known_primes)    
end

Output:

primes_blockmore (generic function with 3 methods)

Input:

gc()
@time primes_blockmore(10^4, Int, 256);

Output:

(total_known[], next_check[]) = (256, 256)
  0.169644 seconds (847.93 k allocations: 17.723 MiB)

Input:

gc()
pr = @time primes_blockmore(10^5, Int, 256);

Output:

(total_known[], next_check[]) = (256, 256)
  4.106018 seconds (16.94 M allocations: 355.002 MiB)

Input:

gc()
@time primes_blockmore(5*10^5, Int, 256);

Output:

(total_known[], next_check[]) = (256, 256)
 64.109311 seconds (226.03 M allocations: 4.581 GiB, 7.13% gc time)

Input:

gc()
@time primes_array(5*10^5, Int);

Output:

 39.164720 seconds (348.88 M allocations: 5.203 GiB, 1.81% gc time)

Conclusion

So with all that, We are still losing to the single threaded code. Maybe if we were using more threads. Or if the code was smarter, we could pull ahead and go faster. But today, I am willing to admit defeat. It is really hard to make this kinda code actually speed-up.

If you can do better, I’ld be keen to know. You can get the notebook that is behind this post from github, you could even fork it and make a PR and I’ll regenerate this blog post (:-D).

One way to make it much much faster is to use a different algorithm. I’m sure there actually exist well documented parallel prime finders.