Parallel Computing
Most modern computers possess more than one CPU, and several computers can be combined together in a cluster. Harnessing the power of these multiple CPUs allows many computations to be completed more quickly. There are two major factors that influence performance: the speed of the CPUs themselves, and the speed of their access to memory. In a cluster, it’s fairly obvious that a given CPU will have fastest access to the RAM within the same computer (node). Perhaps more surprisingly, similar issues are very relevant on a typical multicore laptop, due to differences in the speed of main memory and the cache. Consequently, a good multiprocessing environment should allow control over the “ownership” of a chunk of memory by a particular CPU. Julia provides a multiprocessing environment based on message passing to allow programs to run on multiple processors in separate memory domains at once.Julia’s implementation of message passing is different from other environments such as MPI. Communication in Julia is generally “one-sided”, meaning that the programmer needs to explicitly manage only one processor in a two-processor operation. Furthermore, these operations typically do not look like “message send” and “message receive” but rather resemble higher-level operations like calls to user functions.
Parallel programming in Julia is built on two primitives: remote references and remote calls. A remote reference is an object that can be used from any processor to refer to an object stored on a particular processor. A remote call is a request by one processor to call a certain function on certain arguments on another (possibly the same) processor. A remote call returns a remote reference to its result. Remote calls return immediately; the processor that made the call proceeds to its next operation while the remote call happens somewhere else. You can wait for a remote call to finish by calling wait on its remote reference, and you can obtain the full value of the result using fetch.
Let’s try this out. Starting with julia -p n provides n processors on the local machine. Generally it makes sense for n to equal the number of CPU cores on the machine.
$ ./julia -p 2
julia> r = remote_call(2, rand, 2, 2)
RemoteRef(2,1,0)
julia> s = remote_call(2, +, 1, r)
RemoteRef(2,1,1)
julia> fetch(r)
0.10824216411304866 0.13798233877923116
0.12376292706355074 0.18750497916607167
julia> fetch(s)
1.10824216411304866 1.13798233877923116
1.12376292706355074 1.18750497916607167
Occasionally you might want a remotely-computed value immediately. This typically happens when you read from a remote object to obtain data needed by the next local operation. The function remote_call_fetch exists for this purpose. It is equivalent to fetch(remote_call(...)) but is more efficient.
julia> remote_call_fetch(2, ref, r, 1, 1)
0.10824216411304866
The syntax of remote_call is not especially convenient. The macro @spawn makes things easier. It operates on an expression rather than a function, and picks where to do the operation for you:
julia> r = @spawn rand(2,2)
RemoteRef(1,1,0)
julia> s = @spawn 1+fetch(r)
RemoteRef(1,1,1)
julia> fetch(s)
1.10824216411304866 1.13798233877923116
1.12376292706355074 1.18750497916607167
(It is worth noting that @spawn is not built-in but defined in Julia as a macro. It is possible to define your own such constructs.)
One important point is that your code must be available on any processor that runs it. For example, type the following into the julia prompt:
julia> function rand2(dims...)
return 2*rand(dims...)
end
julia> rand2(2,2)
2x2 Float64 Array:
0.153756 0.368514
1.15119 0.918912
julia> @spawn rand2(2,2)
RemoteRef(1,1,1)
julia> @spawn rand2(2,2)
RemoteRef(2,1,2)
julia> exception on 2: in anonymous: rand2 not defined
julia> load("myfile.jl")
Data Movement
Sending messages and moving data constitute most of the overhead in a parallel program. Reducing the number of messages and the amount of data sent is critical to achieving performance and scalability. To this end, it is important to understand the data movement performed by Julia’s various parallel programming constructs.fetch can be considered an explicit data movement operation, since it directly asks that an object be moved to the local machine. @spawn (and a few related constructs) also moves data, but this is not as obvious, hence it can be called an implicit data movement operation. Consider these two approaches to constructing and squaring a random matrix:
# method 1
A = rand(1000,1000)
Bref = @spawn A^2
...
fetch(Bref)
# method 2
Bref = @spawn rand(1000,1000)^2
...
fetch(Bref)
In this toy example, the two methods are easy to distinguish and choose from. However, in a real program designing data movement might require more thought and very likely some measurement. For example, if the first processor needs matrix A then the first method might be better. Or, if computing A is expensive and only the current processor has it, then moving it to another processor might be unavoidable. Or, if the current processor has very little to do between the @spawn and fetch(Bref) then it might be better to eliminate the parallelism altogether. Or imagine rand(1000,1000) is replaced with a more expensive operation. Then it might make sense to add another @spawn statement just for this step.
Parallel Map and Loops
Fortunately, many useful parallel computations do not require data movement. A common example is a monte carlo simulation, where multiple processors can handle independent simulation trials simultaneously. We can use @spawn to flip coins on two processors:function count_heads(n)
c::Int = 0
for i=1:n
c += randbit()
end
c
end
a = @spawn count_heads(100000000)
b = @spawn count_heads(100000000)
fetch(a)+fetch(b)
This example, as simple as it is, demonstrates a powerful and often-used parallel programming pattern. Many iterations run independently over several processors, and then their results are combined using some function. The combination process is called a reduction, since it is generally tensor-rank-reducing: a vector of numbers is reduced to a single number, or a matrix is reduced to a single row or column, etc. In code, this typically looks like the pattern x = f(x,v[i]), where x is the accumulator, f is the reduction function, and the v[i] are the elements being reduced. It is desirable for f to be associative, so that it does not matter what order the operations are performed in.
Notice that our use of this pattern with count_heads can be generalized. We used two explicit @spawn statements, which limits the parallelism to two processors. To run on any number of processors, we can use a parallel for loop, which can be written in Julia like this:
nheads = @parallel (+) for i=1:200000000
randbit()
end
Note that although parallel for loops look like serial for loops, their behavior is dramatically different. In particular, the iterations do not happen in a specified order, and writes to variables or arrays will not be globally visible since iterations run on different processors. Any variables used inside the parallel loop will be copied and broadcast to each processor.
For example, the following code will not work as intended:
a = zeros(100000)
@parallel for i=1:100000
a[i] = i
end
Using “outside” variables in parallel loops is perfectly reasonable if the variables are read-only:
a = randn(1000)
@parallel (+) for i=1:100000
f(a[randi(end)])
end
In some cases no reduction operator is needed, and we merely wish to apply a function to all integers in some range (or, more generally, to all elements in some collection). This is another useful operation called parallel map, implemented in Julia as the pmap function. For example, we could compute the singular values of several large random matrices in parallel as follows:
M = {rand(1000,1000) for i=1:10}
pmap(svd, M)
Distributed Arrays
Large computations are often organized around large arrays of data. In these cases, a particularly natural way to obtain parallelism is to distribute arrays among several processors. This combines the memory resources of multiple machines, allowing use of arrays too large to fit on one machine. Each processor operates on the part of the array it owns, providing a ready answer to the question of how a program should be divided among machines.A distributed array (or, more generally, a global object) is logically a single array, but pieces of it are stored on different processors. This means whole-array operations such as matrix multiply, scalar*array multiplication, etc. use the same syntax as with local arrays, and the parallelism is invisible. In some cases it is possible to obtain useful parallelism just by changing a local array to a distributed array.
Julia distributed arrays are implemented by the DArray type. A DArray has an element type and dimensions just like an Array, but it also needs an additional property: the dimension along which data is distributed. There are many possible ways to distribute data among processors, but at this time Julia keeps things simple and only allows distributing along a single dimension. For example, if a 2-d DArray is distributed in dimension 1, it means each processor holds a certain range of rows. If it is distrbuted in dimension 2, each processor holds a certain range of columns.
Common kinds of arrays can be constructed with functions beginning with d:
dzeros(100,100,10)
dones(100,100,10)
drand(100,100,10)
drandn(100,100,10)
dcell(100,100,10)
dfill(x, 100,100,10)
drand((100,100,10), 3)
dzeros(Int64, (100,100), 2)
dzeros((100,100), 2, [7, 8])
distribute(a::Array, dim) can be used to convert a local array to a distributed array, optionally specifying the distributed dimension. localize(a::DArray) can be used to obtain the locally-stored portion of a DArray. owner(a::DArray, index) gives the id of the processor storing the given index in the distributed dimension. myindexes(a::DArray) gives a tuple of the indexes owned by the local processor. convert(Array, a::DArray) brings all the data to one node.
A DArray can be stored on a subset of the available processors. Three properties fully describe the distribution of DArray d. d.pmap[i] gives the processor id that owns piece number i of the array. Piece i consists of indexes d.dist[i] through d.dist[i+1]-1. distdim(d) gives the distributed dimension. For convenience, d.localpiece gives the number of the piece owned by the local processor (this could also be determined by searching d.pmap). The array d.pmap is also available as procs(d).
Indexing a DArray (square brackets) gathers all of the referenced data to a local Array object.
Indexing a DArray with the sub function creates a “virtual” sub-array that leaves all of the data in place. This should be used where possible, especially for indexing operations that refer to large pieces of the original array.
sub itself, naturally, does no communication and so is very efficient. However, this does not mean it should be viewed as an optimization in all cases. Many situations require explicitly moving data to the local processor in order to do a fast serial operation. For example, functions like matrix multiply perform many accesses to their input data, so it is better to have all the data available locally up front.
Constructing Distributed Arrays
The primitive DArray constructor is the function darray, which has the following somewhat elaborate signature:darray(init, type, dims, distdim, procs, dist)
type is the element type.
dims is the dimensions of the entire DArray.
distdim is the dimension to distribute in.
procs is a vector of processor ids to use.
dist is a vector giving the first index of each contiguous distributed piece, such that the nth piece consists of indexes dist[n] through dist[n+1]-1. If you have a vector v of the sizes of the pieces, dist can be computed as cumsum([1,v]).
The last three arguments are optional, and defaults will be used if they are omitted. The first argument, the init function, can also be omitted, in which case an uninitialized DArray is constructed.
As an example, here is how to turn the local array constructor rand into a distributed array constructor:
drand(args...) = darray((T,d,da)->rand(d), Float64, args...)
The changedist function, which changes the distribution of a DArray, can be implemented with one call to darray where the init function uses indexing to gather data from the existing array:
function changedist(A::DArray, to_dist)
return darray((T,sz,da)->A[myindexes(da)...],
eltype(A), size(A), to_dist, procs(A))
end
-(A::DArray) = darray(-, A)
Distributed Array Computations
Whole-array operations (e.g. elementwise operators) are a convenient way to use distributed arrays, but they are not always sufficient. To handle more complex problems, tasks can be spawned to operate on parts of a DArray and write the results to another DArray. For example, here is how you could apply a function f to each 2-d slice of a 3-d DArray:function compute_something(A::DArray)
B = darray(eltype(A), size(A), 3)
for i = 1:size(A,3)
@spawnat owner(B,i) B[:,:,i] = f(A[:,:,i])
end
B
end
This code works in some sense, but trouble stems from the fact that it performs writes asynchronously. In other words, we don’t know when the result data will be written to the array and become ready for further processing. This is known as a “race condition”, one of the famous pitfalls of parallel programming. Some form of synchronization is necessary to wait for the result. As we saw above, @spawn returns a remote reference that can be used to wait for its computation. We could use that feature to wait for specific blocks of work to complete:
function compute_something(A::DArray)
B = darray(eltype(A), size(A), 3)
deps = cell(size(A,3))
for i = 1:size(A,3)
deps[i] = @spawnat owner(B,i) B[:,:,i] = f(A[:,:,i])
end
(B, deps)
end
Another option is to use a @sync block, as follows:
function compute_something(A::DArray)
B = darray(eltype(A), size(A), 3)
@sync begin
for i = 1:size(A,3)
@spawnat owner(B,i) B[:,:,i] = f(A[:,:,i])
end
end
B
end
Still another option is to use the initial, un-synchronized version of the code, and place a @sync block around a larger set of operations in the function calling this one.
Synchronization With Remote References
Scheduling
Julia’s parallel programming platform uses Tasks (aka Coroutines) to switch among multiple computations. Whenever code performs a communication operation like fetch or wait, the current task is suspended and a scheduler picks another task to run. A task is restarted when the event it is waiting for completes.For many problems, it is not necessary to think about tasks directly. However, they can be used to wait for multiple events at the same time, which provides for dynamic scheduling. In dynamic scheduling, a program decides what to compute or where to compute it based on when other jobs finish. This is needed for unpredictable or unbalanced workloads, where we want to assign more work to processors only when they finish their current tasks.
As an example, consider computing the singular values of matrices of different sizes:
M = {rand(800,800), rand(600,600), rand(800,800), rand(600,600)}
pmap(svd, M)
function pmap(f, lst)
np = nprocs() # determine the number of processors available
n = length(lst)
results = cell(n)
i = 1
# function to produce the next work item from the queue.
# in this case it's just an index.
next_idx() = (idx=i; i+=1; idx)
@sync begin
for p=1:np
@spawnlocal begin
while true
idx = next_idx()
if idx > n
break
end
results[idx] = remote_call_fetch(p, f, lst[idx])
end
end
end
end
results
end
No comments:
Post a Comment
Thank you