from FluidSim.StaggeredArray import StaggeredArray2D import numpy as np import scipy import scipy.sparse import scipy.sparse.linalg class FluidSimulator2D: def __init__(self, x_n: int, y_n: int): self.x_n = x_n self.y_n = y_n self.array = StaggeredArray2D(self.x_n, self.y_n) self.coordinate_array = np.zeros((x_n, y_n, 2), dtype=np.int) for x in range(x_n): for y in range(y_n): self.coordinate_array[x, y, :] = x, y def advect(self, field: np.ndarray, delta_t: float): u_x, u_y = self.array.get_velocity_arrays() u = np.stack([u_x, u_y], axis=2) def runge_kutta_layer(input, time_elapsed, border_handling='clamp'): shifted_pos = np.round(self.coordinate_array - u * time_elapsed).astype(np.int) * (u < 0) +\ (self.coordinate_array - u * time_elapsed).astype(np.int) * (u > 0) + \ self.coordinate_array * (u == 0) # border handling if border_handling == 'clamp': shifted_pos = np.maximum(0, shifted_pos) shifted_pos[:, :, 0] = np.minimum(len(input), shifted_pos[:, :, 0]) shifted_pos[:, :, 1] = np.minimum(len(input[0]), shifted_pos[:, :, 1]) pass layer = np.zeros(field.shape, dtype=field.dtype) for x in range(self.x_n): for y in range(self.y_n): layer[x, y] = field[shifted_pos[x, y][0], shifted_pos[x, y][1]] - field[x, y] return layer k1 = runge_kutta_layer(field, delta_t) k2 = runge_kutta_layer(field + 0.5 * delta_t * k1, 0.5 * delta_t) k3 = runge_kutta_layer(field + 0.75 * delta_t * k2, 0.75 * delta_t) # maybe 0.25 instead? # new_field = field + 2.0 / 9.0 * delta_t * k1 + 3.0 / 9.0 * delta_t * k2 + 4.0 / 9.0 * delta_t * k3 new_field = field + k1 return new_field def get_timestep(self, f, h=1.0, k_cfl=1.0): f_length = np.max(np.sqrt(np.sum(np.square(f), axis=-1))) u_x, u_y = self.array.get_velocity_arrays() u_length = np.max(np.sqrt(np.square(u_x) + np.square(u_y))) # return k_cfl * h / (u_length + np.sqrt(h + f_length)) return k_cfl * h / u_length def update_velocity(self, timestep, border_handling='constant'): if border_handling == 'constant': p_diff_x = np.pad(self.array.p, [(0, 1), (0, 0)], mode='constant', constant_values=0) -\ np.pad(self.array.p, [(1, 0), (0, 0)], mode='constant', constant_values=0) borders_fluid_x = np.pad(self.array.has_fluid, [(0, 1), (0, 0)], mode='constant', constant_values=False) +\ np.pad(self.array.has_fluid, [(1, 0), (0, 0)], mode='constant', constant_values=False) p_diff_y = np.pad(self.array.p, [(0, 0), (0, 1)], mode='constant', constant_values=0) -\ np.pad(self.array.p, [(0, 0), (1, 0)], mode='constant', constant_values=0) borders_fluid_y = np.pad(self.array.has_fluid, [(0, 0), (0, 1)], mode='constant', constant_values=False) +\ np.pad(self.array.has_fluid, [(0, 0), (1, 0)], mode='constant', constant_values=False) else: p_diff_x = 0 p_diff_y = 0 borders_fluid_x = False borders_fluid_y = False u_x_new = self.array.u_x - (timestep * p_diff_x) * (1.0 * borders_fluid_x) u_y_new = self.array.u_y - (timestep * p_diff_y) * (1.0 * borders_fluid_y) # clear all components that do not border the fluid u_x_new *= (1.0 * borders_fluid_x) u_y_new *= (1.0 * borders_fluid_y) return u_x_new, u_y_new def calculate_divergence(self, h=1.0): dx_u = (self.array.u_x[1:, :] - self.array.u_x[:-1, :]) / h dy_u = (self.array.u_y[:, 1:] - self.array.u_y[:, -1]) / h return dx_u + dy_u def pressure_solve(self, divergence): new_p = np.zeros((self.array.x_n, self.array.y_n)) connection_matrix = np.zeros((self.array.x_n * self.array.y_n, self.array.x_n * self.array.y_n)) flat_divergence = np.zeros((self.array.x_n * self.array.y_n)) for x in range(self.array.x_n): for y in range(self.array.y_n): flat_divergence[x * self.array.y_n + y] = divergence[x, y] neighbors = 4 if x == 0: neighbors -= 1 else: connection_matrix[x * self.array.y_n + y, (x - 1) * self.array.y_n + y] = 1 if y == 0: neighbors -= 1 else: connection_matrix[x * self.array.y_n + y, x * self.array.y_n + y - 1] = 1 if x == self.array.x_n - 1: neighbors -= 1 else: connection_matrix[x * self.array.y_n + y, (x + 1) * self.array.y_n + y] = 1 if y == self.array.y_n - 1: neighbors -= 1 else: connection_matrix[x * self.array.y_n + y, x * self.array.y_n + y + 1] = 1 connection_matrix[x * self.array.y_n + y, x * self.array.y_n + y] = -neighbors p = scipy.sparse.linalg.spsolve(connection_matrix, -flat_divergence) for x in range(self.array.x_n): for y in range(self.array.y_n): new_p[x, y] = p[x * self.array.y_n + y] return new_p def timestep(self, external_f, h=1.0, k_cfl=1.0): """ :param external_f: external forces to be applied :param h: grid size :param k_cfl: speed up multiplier (reduces accuracy if > 1.0. Does not make much sense if smaller than 1.0) :return: """ delta_t = self.get_timestep(external_f, h, k_cfl) has_fluid = self.advect(self.array.has_fluid * 1.0, delta_t) self.array.density = self.advect(self.array.density, delta_t) # temp_u_x = self.advect(self.array.u_x, delta_t) # temp_u_y = self.advect(self.array.u_y, delta_t) # self.array.u_x = temp_u_x # self.array.u_y = temp_u_y #TODO advect velocity test_u = np.stack(self.array.get_velocity_arrays(), axis=2) test = self.advect(np.stack(self.array.get_velocity_arrays(), axis=2), delta_t) # TODO maybe use dynamic threshold to conserve amount of cells containing fluid self.array.has_fluid = has_fluid >= 0.5 # add more stuff to advect here. For example temperature, density, and other things. Maybe advect velocity. # self.array.u_x, self.array.u_y = self.update_velocity(delta_t) # TODO add forces (a = F / m) -> add a to u dx_u = (self.array.u_x[1:, :] - self.array.u_x[:-1, :]) / h dy_u = (self.array.u_y[:, 1:] - self.array.u_y[:, -1]) / h dx_u = self.advect(dx_u, delta_t) dy_u = self.advect(dy_u, delta_t) divergence = dx_u + dy_u # divergence = self.calculate_divergence(h) self.array.p = self.pressure_solve(divergence) self.array.u_x, self.array.u_y = self.update_velocity(delta_t) if __name__ == '__main__': fs = FluidSimulator2D(50, 50) fs.array.has_fluid[10, 10] = True for i in range(100): fs.timestep(0) print(i) print(fs.array.has_fluid[10, 10]) print(np.sum(fs.array.has_fluid * 1.0)) # test = fs.advect(fs.array.has_fluid * 1.0, 1.0) # # test2 = fs.update_velocity(1.0) # # test3 = fs.calculate_divergence() # # test4 = fs.pressure_solve(test3)