About comprehensions, BigInts, Interfaces, Iterators and Tasks in Julia
This post is the continuation of the intro to Julia.
The notebook
The ipython notebook used to generate this post is available on github. You can clone it in local, or sync a local folder to it on JuliaBox.
BigInts and array comprehensions: Fermat numbers
A Fermat number is a positive integer of the form
The only known Fermat number that are also primes are , , , , and . In the following sections, I test two algorithms for prime decomposition on sequences of Fermat numbers.
Julia does not promote automatically from Integer to BigInt.
Thus, I prefer to explicitly handle overflows:
if needed, I promote the argument to BigInt
, to propagate the BigInt type everywhere.
Note that in the following cell I build the array using an array comprehension.
In [1]:
function fermat(n::Integer)
if n >= 6
n = BigInt(n)
end
2 ^ (2^n) + 1
end
fermat(from, to) = [fermat(i) for i in from:to]
fermat(0, 6)
7-element Array{Union{BigInt,Int64},1}:
3
5
17
257
65537
4294967297
18446744073709551617
Iteration interface and Tasks: Trial Division
The most naive algorithm for integers factorization is Trial Division: to find the prime factors of , we try to divide it by all prime numbers smaller than .
To generate the prime numbers, I use the Sundaram sieve (Cf. [the first post]/julia/euler7/), but I exploit the iteration interface of Julia to have a lazy implementation of it.
Iteration Interface: a lazy implementation of Sundaram Sieve
Julia can be extended with new user defined types. which can be integrated seamlessly into the language by implementing the right interfaces.
Consider for example the for loop
for i in obj
do something with i
end
It can work with any object implementing the iteration interface, i.e. the methods
start
, next
and done
.
That loop is syntactic sugar equivalent to
state = start(obj)
while !done(obj, state)
i, state = next(obj, state)
end
To define a lazy version of the Sundaram sieve, I create a new composite type,
containing the is_prime
array of booleans (a bit array in this implementation), and the upper bound .
The type is parametric, since can be Integer
or BigInt
.
In [2]:
immutable SundaramSieve{T}
is_prime::BitArray
ub::T
end
The iteration interface let me track the status of the iteration via the state
object, which can be anything, and which is passed around by the three methods.
I use as state the last index checked by the sieve.
I start at 1, so start
returns 1.
If is the current state, then done
is true if there are no more primes between and . But to know that, the sieve must have run until . Thus, we maintain the sieving process one step beyond the prime returned at current iteration:
it computes until 7 but it returns 5; the next step it computes until 11 but it returns 7, and so on.
next
receives the state, which allows to compute the prime it has to return, but it runs one step of the sieve until the next prime, or .
In [3]:
sundaram_sieve(n::Integer) = SundaramSieve(trues(n), div(n, 2)+1)
# the first state. In the state keep the last checked integer
Base.start(s::SundaramSieve) = 1
function Base.next(s::SundaramSieve, state)
for i = state:s.ub
step = i * 2 + 1
for j=i+i*step:step:s.ub
s.is_prime[j] = false
end
if s.is_prime[i]
return (max(2, (state-1)*2+1), i+1)
end
end
(max(2, (state-1)*2+1), s.ub)
end
Base.done(s::SundaramSieve, state) = state >= s.ub
done (generic function with 51 methods)
In [4]:
collect(sundaram_sieve(10))
4-element Array{Any,1}:
2
3
5
7
Another way of implementing lazy evaluated sequences are coroutines.
Intro to Coroutines
Coroutines are functions whose evaluation can be suspended and resumed.
To create a coroutine, pass your function to the Task
function.
To start, and resume, the execution of a coroutine use the method consume
.
The function wrapped inside, produces data, and returns the control to the caller of consume
,
using the produce
function.
Task implements also the iteration interface, and iterating on it is equivalent to calling
consume
.
Julia’s runtime provides a scheduler to coordinate the execution of multiple coroutines.
Julia’s coroutines always run in a single process, and they are scheduled with a cooperative
multitasking strategy: each coroutine has the responsibility
to release the control to other coroutines, by executing blocking operations (like reading from a Channel)
or by calling yield
or produce
. For the moment we will use them just as iterators,
but in the following sections we will see how to actually exploit them to coordinate
parallel tasks execution.
In [5]:
function trial_division(n)
function _f()
if n < 2
return
end
ub::Integer = ceil(sqrt(n)) + 1
for prime in sundaram_sieve(ub)
while n % prime == 0
produce(prime)
n = div(n, prime)
end
end
if n > 1
produce(n)
end
end
Task(_f)
end
trial_division (generic function with 1 method)
In [6]:
f5 = fermat(5)
print("$f5 = 1")
for factor in trial_division(f5)
print(" * $factor")
end
4294967297 = 1 * 641 * 6700417
Unfortunately we cannot run this algorithm for bigger s, since we are not able to store the sieve.
In [7]:
collect(trial_division(fermat(6)))
LoadError: MethodError: `convert` has no method matching convert(::Type{BitArray{N}}, ::BigInt)
This may have arisen from a call to the constructor BitArray{N}(...),
since type constructors fall back to convert methods.
Closest candidates are:
call{T}(::Type{T}, ::Any)
convert{T,N}(::Type{BitArray{N}}, !Matched::AbstractArray{T,N})
convert{T}(::Type{T}, !Matched::T)
...
while loading In[7], in expression starting on line 1
in schedule_and_wait at task.jl:343
in consume at task.jl:259
in collect at array.jl:260
in collect at array.jl:272
Julia parallel computing: Pollard’s rho algorithm
The function pollard_rho
is almost a copy&paste from the wikipedia pseudo-code.
pollard_rho
finds a divisor for the argument . If it returns and , s.t. .
In [8]:
g(x, n) = (x^2 + 1) % n
function pollard_rho(n)
x, y, d = 2, 2, 1
if n % 2 == 0
d = 2
elseif n % 3 == 0
d = 3
else
while d == 1
x = g(x, n)
y = g(g(y, n), n)
d = gcd(abs(x-y), n)
end
end
d, div(n, d)
end
pollard_rho (generic function with 1 method)
It is already able to do more things than our trial_division
!
In [9]:
f6 = fermat(6)
x, y = pollard_rho(f6)
"$f6 = $x * $y"
"18446744073709551617 = 274177 * 67280421310721"
What is not able to do is returning all the factors of . To do that, we must recursively apply
pollard_rho
to the factors of , if they are different from and 1.
Those operations can be easily parallelized. In Julia, parallelism starts with cluster managers.
Cluster managers
ClusterManager
s allow to create, manage, destroy sets of related processes.
Julia provides two of them:
LocalManager
, used to manage processes on a single machineSSHManager
, to spawn processes on remote machines
Only the initial process, called master
, has the right to spawn new processes.
To ask to the LocalManager
to run 3 processes, we use
In [11]:
missing_procs = 3 - nprocs() # nnprocs() returns the number of running processes
if missing_procs > 0
addprocs(missing_procs)
end
Now we can ask to the ClusterManager to run functions on those processes. There is a reach API to do that, but the basic flow is
- you run a remote call, getting immediately a
RemoteRef
- later on, you can
fetch
from aRemoteRef
to get the actual result
A RemoteRef
is an implementation of a more general interface, called Channel
.
Channel
s and shared memory (via mmap
) are the basic building blocks for
communication among tasks.
In [12]:
f = @spawn rand(10) # remote call
fetch(f) # fetch
10-element Array{Float64,1}:
0.975851
0.757001
0.327111
0.852844
0.150347
0.57112
0.420745
0.0726167
0.406822
0.196387
To run a remote call, the called function must be known to all the processes.
This is not the case for pollard_rho
.
In general, we should run the cluster at the startup, and load our functions from a module.
Here I will re-evaluate that function using the everywhere
macro, which runs its argument everywhere.
In [13]:
@everywhere g(x, n) = (x^2 - 1) % n
@everywhere function pollard_rho(n)
x, y, d = 2, 2, 1
if n % 2 == 0
d = 2
elseif n % 3 == 0
d = 3
else
while d == 1
x = g(x, n)
y = g(g(y, n), n)
d = gcd(abs(x-y), n)
end
end
d, div(n, d)
end
In [14]:
f = @spawn pollard_rho(100)
fetch(f)
(2,50)
The API we are going to implement is the same of trial_division
: from the point of view of the user, it produces lazily the results. Under the hood, it will run in parallel the factorization.
To do this we need a scheduler function, which manages the execution of pollard_rho
on new factors, as soon as they
are found. This is where coroutines show their power.
Channels, and more on Coroutines
The basic data structure to handle communication and syncronization among tasks is Channel.
Channels are shared queues that can be accessed by multiple readers, via fetch
or take!
,
and by multiple writers, via put!
.
The size of Channels is finite. If there are no objects available, or if the channel is full, reading or writing are blocking operation.
A RemoteRef
is a Channel of size 1.
To coordinate the remote execution of pollard_rho, we will use again coroutines. Remember? Julia runtime
provides a scheduler to coordinate the execution of multiple coroutines. The control flow switches from a coroutine
to another in case of a blocking operation, like reading from a channel, or by using yield
or produce
.
Our scheduler has the responsibility of running pollard_rho
, to find new factors, and synchronizing the execution
of the tasks.
pollard_rho_factorization
works in this way:
-
It creates and returns a Channel
primes
. Coroutines will push on it new prime numbers. When everything is done, it will be closed. The user will get primes iterating over it -
It creates a task to run
factor
. The macro@schedule
wraps the statement it receives in a Task, and schedules it. Oncefactor
is done, it closes theprimes
channel. At that point the loop will finish -
factor
creates a new task, using@sync
.@sync
is like@schedule
, but it terminates once all tasks created in it are done -
this task runs remotely
pollard_rho
.fetch
is blocking, so at that point the scheduler resumes another coroutine. Depending on the result, it decides whether to push the result to the channel, whether to create and schedule new coroutines runningfactor
on it.@async
creates and schedules coroutines, but it does not wait for their termination. Here we actually creates and runs two parallel tasks.
In [15]:
function pollard_rho_factorization(n)
function factor(n, primes)
@sync begin
ref = @spawn pollard_rho(n)
x, y = fetch(ref)
if x == 1
put!(primes, y)
elseif y == 1
put!(primes, x)
else
@async factor(x, primes)
@async factor(y, primes)
end
end
end
primes = Channel()
@schedule begin
factor(n, primes)
close(primes)
end
primes
end
pollard_rho_factorization (generic function with 1 method)
In [16]:
test = reduce(*, map(fermat, 1:6))
print("$test = 1")
for factor in pollard_rho_factorization(test)
print(" * $factor")
end
113427455640312821154458202477256070485 = 1 * 5 * 17 * 257 * 641 * 274177 * 65537 * 6700417 * 67280421310721