Source code for freegs.multigrid

"""
Multigrid solver for elliptic problems

Example
-------

$ python multigrid.py

This will run the test case, solving Poisson equation in 2D

Copyright 2016 Ben Dudson, University of York. Email: benjamin.dudson@york.ac.uk

This file is part of FreeGS.

FreeGS is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

FreeGS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public License
along with FreeGS.  If not, see <http://www.gnu.org/licenses/>.

"""

from numpy import abs, max, reshape, zeros
from scipy.sparse import eye
from scipy.sparse.linalg import factorized


[docs] class MGDirect:
[docs] def __init__(self, A): self.solve = factorized(A.tocsc()) # LU decompose
def __call__(self, x, b): b1d = reshape(b, -1) # 1D view x = self.solve(b1d) return reshape(x, b.shape)
[docs] class MGJacobi:
[docs] def __init__(self, A, ncycle=4, niter=10, subsolver=None): """ Initialise solver A - The matrix to solve subsolver - An operator at lower resolution ncycle - Number of V-cycles niter - Number of Jacobi iterations """ self.A = A self.diag = A.diagonal() self.subsolver = subsolver self.niter = niter self.ncycle = ncycle self.sub_b = None self.xupdate = None
def __call__(self, xi, bi, ncycle=None, niter=None): """ Solve Ax = b, given initial guess for x ncycle - Optional number of cycles """ # Need to reshape x and b into 1D arrays x = reshape(xi, -1) b = reshape(bi, -1) if ncycle is None: ncycle = self.ncycle if niter is None: niter = self.niter for _c in range(ncycle): # Jacobi smoothing for _i in range(niter): x += (b - self.A.dot(x)) / self.diag if self.subsolver: # Calculate the error error = b - self.A.dot(x) # Restrict error onto coarser mesh self.sub_b = restrict(reshape(error, xi.shape)) # smooth this error sub_x = zeros(self.sub_b.shape) sub_x = self.subsolver(sub_x, self.sub_b) # Prolong the solution self.xupdate = interpolate(sub_x) x += reshape(self.xupdate, -1) # Jacobi smoothing for _i in range(niter): x += (b - self.A.dot(x)) / self.diag return x.reshape(xi.shape)
[docs] def createVcycle(nx, ny, generator, nlevels=4, ncycle=1, niter=10, direct=True): """ Create a hierarchy of solvers in a multigrid V-cycle nx, ny - The highest resolution generator(nx,ny) - Returns a sparse matrix, given resolution nlevels - Number of multigrid levels direct - Lowest level uses direct solver ncycle - Number of V cycles. This is only passed to the top level MGJacobi object niter - Number of Jacobi iterations per level """ if (nx - 1) % 2 == 1 or (ny - 1) % 2 == 1: # Can't divide any further nlevels = 1 if nlevels > 1: # Create the solver at lower resolution nxsub = (nx - 1) // 2 + 1 nysub = (ny - 1) // 2 + 1 subsolver = createVcycle( nxsub, nysub, generator, nlevels - 1, niter=niter, direct=direct ) # Create the sparse matrix A = generator(nx, ny) # Create the solver return MGJacobi(A, niter=niter, subsolver=subsolver, ncycle=ncycle) # At lowest level # Create the sparse matrix A = generator(nx, ny) if direct: return MGDirect(A) return MGJacobi(A, niter=niter, ncycle=ncycle, subsolver=None)
[docs] def smoothJacobi(A, x, b, dx, dy): """ Smooth the solution using Jacobi method """ if b.shape != x.shape: raise ValueError("b and x have different shapes") return x + (b - A(x, dx, dy)) / A.diag(dx, dy)
[docs] def restrict(orig, out=None, avg=False): """ Coarsen the original onto a coarser mesh Inputs ------ orig[nx,ny] - A 2D numpy array. Each dimension must have a size (2^n + 1) though nx != ny is possible Returns ------- numpy.ndarray A 2D numpy array of size [(nx-1)/2+1, (ny-1)/2+1] """ nx = orig.shape[0] ny = orig.shape[1] if (nx - 1) % 2 == 1 or (ny - 1) % 2 == 1: # Can't divide any further if out is None: return orig out.resize(orig.shape) out[:, :] = orig return None # Dividing x and y in 2 nx = (nx - 1) // 2 + 1 ny = (ny - 1) // 2 + 1 if out is None: out = zeros([nx, ny]) else: out.resize([nx, ny]) for x in range(1, nx - 1): for y in range(1, ny - 1): x0 = 2 * x y0 = 2 * y out[x, y] = orig[x0, y0] / 4.0 +( orig[x0 + 1, y0] + orig[x0 - 1, y0] + orig[x0, y0 + 1] + orig[x0, y0 - 1] ) / 8.0 +( orig[x0 - 1, y0 - 1] + orig[x0 - 1, y0 + 1] + orig[x0 + 1, y0 - 1] + orig[x0 + 1, y0 + 1] ) / 16.0 if not avg: out *= 4.0 return out
[docs] def interpolate(orig, out=None): """ Interpolate a solution onto a finer mesh """ nx = orig.shape[0] ny = orig.shape[1] nx2 = 2 * (nx - 1) + 1 ny2 = 2 * (ny - 1) + 1 if out is None: out = zeros([nx2, ny2]) else: out[:, :] = 0.0 for x in range(1, nx - 1): for y in range(1, ny - 1): x0 = 2 * x y0 = 2 * y out[x0 - 1, y0 - 1] += 0.25 * orig[x, y] out[x0 - 1, y0] += 0.5 * orig[x, y] out[x0 - 1, y0 + 1] += 0.25 * orig[x, y] out[x0, y0 - 1] += 0.5 * orig[x, y] out[x0, y0] = orig[x, y] out[x0, y0 + 1] += 0.5 * orig[x, y] out[x0 + 1, y0 - 1] += 0.25 * orig[x, y] out[x0 + 1, y0] += 0.5 * orig[x, y] out[x0 + 1, y0 + 1] += 0.25 * orig[x, y] return out
[docs] def smoothVcycle(A, x, b, dx, dy, niter=10, sublevels=0, direct=True): """ Perform smoothing using multigrid """ # Smooth for _i in range(niter): x = smoothJacobi(A, x, b, dx, dy) if sublevels > 0: # Calculate the error error = b - A(x, dx, dy) # Restrict error onto coarser mesh Cerror = restrict(error) # smooth this error Cx = zeros(Cerror.shape) Cx = smoothVcycle(A, Cx, Cerror, dx * 2.0, dy * 2.0, niter, sublevels - 1) # Prolong the solution xupdate = interpolate(Cx) x = x + xupdate # Smooth for _i in range(niter): x = smoothJacobi(A, x, b, dx, dy) return x
[docs] def smoothMG(A, x, b, dx, dy, niter=10, sublevels=1, ncycle=2): error = b - A(x, dx, dy) print(f"Starting max residual: {max(abs(error)):e}") for c in range(ncycle): x = smoothVcycle(A, x, b, dx, dy, niter, sublevels) error = b - A(x, dx, dy) print(f"Cycle {c} : {max(abs(error))}") return x
[docs] class LaplacianOp: """ Implements a simple Laplacian operator for use with the multigrid solver """ def __call__(self, f, dx, dy): nx = f.shape[0] ny = f.shape[1] b = zeros([nx, ny]) for x in range(1, nx - 1): for y in range(1, ny - 1): # Loop over points in the domain b[x, y] = (f[x - 1, y] - 2 * f[x, y] + f[x + 1, y]) / dx**2 + ( f[x, y - 1] - 2 * f[x, y] + f[x, y + 1] ) / dy**2 return b
[docs] def diag(self, dx, dy): return -2.0 / dx**2 - 2.0 / dy**2
[docs] class LaplaceSparse:
[docs] def __init__(self, Lx, Ly): self.Lx = Lx self.Ly = Ly
def __call__(self, nx, ny): dx = self.Lx / (nx - 1) dy = self.Ly / (ny - 1) # Create a linked list sparse matrix N = nx * ny A = eye(N, format="lil") for x in range(1, nx - 1): for y in range(1, ny - 1): row = x * ny + y A[row, row] = -2.0 / dx**2 - 2.0 / dy**2 # y-1 A[row, row - 1] = 1.0 / dy**2 # y+1 A[row, row + 1] = 1.0 / dy**2 # x-1 A[row, row - ny] = 1.0 / dx**2 # x+1 A[row, row + ny] = 1.0 / dx**2 # Convert to Compressed Sparse Row (CSR) format return A.tocsr()
if __name__ == "__main__": # Test case from timeit import default_timer as timer import matplotlib.pyplot as plt from numpy import exp, linspace, meshgrid nx = 65 ny = 65 dx = 1.0 / (nx - 1) dy = 1.0 / (ny - 1) xx, yy = meshgrid(linspace(0, 1, nx), linspace(0, 1, ny)) rhs = exp(-((xx - 0.5) ** 2 + (yy - 0.5) ** 2) / 0.4**2) rhs[0, :] = 0.0 rhs[:, 0] = 0.0 rhs[nx - 1, :] = 0.0 rhs[:, ny - 1] = 0.0 x = zeros([nx, ny]) x2 = x.copy() A = LaplacianOp() ################ SIMPLE ITERATIVE SOLVER ############## for i in range(1): x2 = smoothJacobi(A, x, rhs, dx, dy) x, x2 = x2, x # Swap arrays error = rhs - A(x, dx, dy) print(f"%d : {i:e}") ################ MULTIGRID SOLVER ####################### print("Python multigrid solver") x = zeros([nx, ny]) start = timer() x = smoothMG(A, x, rhs, dx, dy, niter=5, sublevels=3, ncycle=2) end = timer() error = rhs - A(x, dx, dy) print(f"Max error : {max(abs(error))}") print(f"Run time : {end - start} seconds") ################ SPARSE MATRIX ########################## print("Sparse matrix solver") x2 = zeros([nx, ny]) start = timer() solver = createVcycle( nx, ny, LaplaceSparse(1.0, 1.0), ncycle=2, niter=5, nlevels=4, direct=True ) start_solve = timer() x2 = solver(x2, rhs) end = timer() error = rhs - A(x2, dx, dy) print(f"Max error : {max(abs(error))}") print(f"Setup time: {start_solve - start}, run time: {end - start_solve} seconds") print(f"Values: {x2[10, 20]}, {x[10, 20]}") f = plt.figure() # plt.contourf(x) plt.plot(x[:, 32]) plt.plot(x2[:, 32]) plt.show()