Source code for tsnet.simulation.single

"""
The tsnet.simulation.single contains methods to perform MOC
transient simulation on a single pipe, including
1. inner pipe
2. left boundary pipe (without C- charateristic grid)
3. right boundary pipe (without C+ characteristic grid)

"""
import numpy as np
from tsnet.simulation.solver import (
    inner_node_steady,
    inner_node_quasisteady,
    inner_node_unsteady,
    valve_node,
    pump_node,
    source_pump,
    valve_end,
    dead_end,
    rev_end,
    add_leakage,
    surge_tank,
    air_chamber
)

[docs]def inner_pipe (linkp, pn, dt, links1, links2, utype, dtype, p, H0, V0, H, V, H10, V10, H20, V20, pump, valve, friction, dVdt, dVdx, dVdt10, dVdx10, dVdt20, dVdx20): """MOC solution for an individual inner pipe. Parameters ---------- linkp : object Current pipe object pn : int Current pipe ID dt : float Time step H : numpy.ndarray Head of current pipe at current time step [m] V : numpy.ndarray Velocity of current pipe at current time step [m/s] links1 : list Upstream adjacent pipes links2 : list Downstream adjacent pipes utype : list Upstream adjacent link type, and if not pipe, their name dtype : list Downstream adjacent link type, and if not pipe, their name p : list pipe list H0 : numpy.ndarray Head of current pipe at previous time step [m] V0 : numpy.ndarray Velocity of current pipe at previous time step [m/s] H10 : list Head of left adjacent nodes at previous time step [m] V10 : list Velocity of left adjacent nodes at previous time step [m/s] H20 : list Head of right adjacent nodes at previous time step [m] V20 : list Velocity of right adjacent nodes at previous time step [m/s] pump : list Characteristics of the pump valve : list Characteristics of the valve friction: str friction model, e.g., 'steady', 'quasi-steady', 'unsteady', by default 'steady' dVdt: numpy.ndarray local instantaneous acceleration approximation to be used for unsteady friction calculation, 0 if not in unsteady friction mode [m/s^2] dVdx: numpy.ndarray convective instantaneous acceleration approximation to be used for unsteady friction calculation, 0 if not in unsteady friction mode [m/s^2] dVdt10 : list local instantaneous acceleration of left adjacent nodes at previous time step [m] dVdx10 : list convective instantaneous acceleration of left adjacent nodes at previous time step [m/s] dVdt20 : list local instantaneous acceleration of right adjacent nodes at previous time step [m] dVdx20 : list convective instantaneous acceleration of right adjacent nodes at previous time step [m/s] Returns ------- H : numpy.ndarray Head results of the current pipe at current time step. [m] V : numpy.ndarray Velocity results of the current pipe at current time step. [m/s] """ # Properties of current pipe g = 9.8 # m/s^2 link1 = [p[abs(i)-1] for i in links1] link2 = [p[abs(i)-1] for i in links2] n = linkp.number_of_segments # spatial discretization # inner nodes if friction == 'steady': H[1:-1], V[1:-1] = inner_node_steady(linkp, H0, V0, dt, g) elif friction == 'quasi-steady': H[1:-1], V[1:-1] = inner_node_quasisteady(linkp, H0, V0, dt, g) else: H[1:-1], V[1:-1] = inner_node_unsteady(linkp, H0, V0, dt, g, dVdx, dVdt) # Pipe start V1 = V10; H1 = H10 #list V2 = V0[1]; H2 = H0[1] dVdx1 = dVdx10 ; dVdt1 = dVdt10 dVdx2 = dVdx[0]; dVdt2 = dVdt[1] if utype[0] == 'Pipe': if linkp.start_node.transient_node_type == 'SurgeTank': shape = linkp.start_node.tank_shape H[0], V[0], Qs = surge_tank(shape, link1, linkp, H1, V1, H2, V2, dt, g, 0, np.sign(links1), [-1], friction, dVdx1, dVdx2, dVdt1, dVdt2) linkp.start_node.water_level = H[0] linkp.start_node.tank_flow = Qs elif linkp.start_node.transient_node_type == 'Chamber': shape = linkp.start_node.tank_shape H[0], V[0], Qs, zp = air_chamber(shape, link1, linkp, H1, V1, H2, V2, dt, g, 0, np.sign(links1), [-1], friction, dVdx1, dVdx2, dVdt1, dVdt2) linkp.start_node.water_level = zp linkp.start_node.tank_flow = Qs else: elev = linkp.start_node.elevation emitter_coeff = linkp.start_node.emitter_coeff + linkp.start_node.demand_coeff block_per = linkp.start_node.block_per H[0], V[0] = add_leakage(emitter_coeff, block_per, link1, linkp, elev, H1, V1, H2, V2, dt, g, 0, np.sign(links1), [-1], friction, dVdx1, dVdx2, dVdt1, dVdt2) elif utype[0] == 'Pump': pumpc = pump[0] H[0], V[0] = pump_node(pumpc, link1, linkp, H1, V1, H2, V2, dt, g, 0, np.sign(links1), [-1], friction, dVdx1, dVdx2, dVdt1, dVdt2) elif utype[0] == 'Valve': valvec = valve[0] H[0], V[0] = valve_node(valvec, link1, linkp, H1, V1, H2, V2, dt, g, 0, np.sign(links1), [-1], friction, dVdx1, dVdx2, dVdt1, dVdt2) # Pipe end V1 = V0[n-1]; H1 = H0[n-1] V2 = V20; H2 = H20 dVdx1 = dVdx[n-1] ; dVdt1 = dVdt[n-1] dVdx2 = dVdx20; dVdt2 = dVdt20 if dtype[0] == 'Pipe': if linkp.end_node.transient_node_type == 'SurgeTank': shape = linkp.end_node.tank_shape H[n], V[n], Qs = surge_tank(shape, linkp, link2, H1, V1, H2, V2, dt, g, n, [1], np.sign(links2), friction, dVdx1, dVdx2, dVdt1, dVdt2) linkp.end_node.water_level = H[n] linkp.end_node.tank_flow = Qs elif linkp.end_node.transient_node_type == 'Chamber': shape = linkp.end_node.tank_shape H[n], V[n], Qs,zp = air_chamber(shape, linkp, link2, H1, V1, H2, V2, dt, g, n, [1], np.sign(links2), friction, dVdx1, dVdx2, dVdt1, dVdt2) linkp.end_node.water_level = zp linkp.end_node.tank_flow = Qs else: elev = linkp.end_node.elevation emitter_coeff = linkp.end_node.emitter_coeff + linkp.end_node.demand_coeff block_per = linkp.end_node.block_per H[n], V[n] = add_leakage(emitter_coeff, block_per,linkp, link2, elev, H1, V1, H2, V2, dt, g, n, [1], np.sign(links2), friction, dVdx1, dVdx2, dVdt1, dVdt2) elif dtype[0] == 'Pump': pumpc = pump[1] H[n], V[n] = pump_node(pumpc, linkp, link2, H1, V1, H2, V2, dt, g, n, [1], np.sign(links2), friction, dVdx1, dVdx2, dVdt1, dVdt2) elif dtype[0] == 'Valve': valvec = valve[1] H[n], V[n] = valve_node(valvec, linkp, link2, H1, V1, H2, V2, dt, g, n, [1], np.sign(links2), friction, dVdx1, dVdx2, dVdt1, dVdt2) return H, V
[docs]def left_boundary(linkp, pn, H, V, H0, V0, links2, p, pump, valve, dt, H20, V20, utype, dtype, friction, dVdt, dVdx, dVdt20, dVdx20) : """MOC solution for an individual left boundary pipe. Parameters ---------- linkp : object Current pipe object pn : int Current pipe ID H : numpy.ndarray Head of current pipe at current time step [m] V : numpy.ndarray Velocity of current pipe at current time step [m/s] links2 : list Downstream adjacent pipes p : list pipe list pump : list Characteristics of the pump valve : list Characteristics of the valve n : int Number of discretization of current pipe dt : float Time step H0 : numpy.ndarray Head of current pipe at previous time step [m] V0 : numpy.ndarray Velocity of current pipe at previous time step [m/s] H20 : list Head of right adjacent nodes at previous time step [m] V20 : list Velocity of right adjacent nodes at previous time step [m/s] utype : list Upstream adjacent link type, and if not pipe, their name dtype : list Downstream adjacent link type, and if not pipe, their name friction: str friction model, e.g., 'steady', 'quasi-steady', 'unsteady', by default 'steady' dVdt: numpy.ndarray local instantaneous velocity approximation to be used for unsteady friction calculation, 0 if not in unsteady friction mode [m/s^2] dVdx: numpy.ndarray convective instantaneous velocity approximation to be used for unsteady friction calculation, 0 if not in unsteady friction mode [m/s^2] dVdt20 : list local instantaneous acceleration of right adjacent nodes at previous time step [m] dVdx20 : list convective instantaneous acceleration of right adjacent nodes at previous time step [m/s] Returns ------- H : numpy.ndarray Head results of the current pipe at current time step. [m] V : numpy.ndarray Velocity results of the current pipe at current time step. [m/s] """ link2 = [p[abs(i)-1] for i in links2] # Properties of current pipe f = linkp.roughness # unitless D = linkp.diameter # m g = 9.8 # m/s^2 a = linkp.wavev # m/s n = linkp.number_of_segments # spatial discretization KD = linkp.roughness_height # inner nodes if friction == 'steady': H[1:-1], V[1:-1] = inner_node_steady(linkp, H0, V0, dt, g) elif friction == 'quasi-steady': H[1:-1], V[1:-1] = inner_node_quasisteady(linkp, H0, V0, dt, g) else: H[1:-1], V[1:-1] = inner_node_unsteady(linkp, H0, V0, dt, g, dVdx, dVdt) # Pipe start (outer boundayr conditions) V2 = V0[1]; H2 = H0[1] dVdx2 = dVdx[0]; dVdt2= dVdt[1] if utype[0] == 'Reservoir' or utype[0] == 'Tank': H[0], V[0] = rev_end (H2, V2, H[0], 0, a, g, f, D, dt, KD, friction, dVdx2, dVdt2) elif utype[0] == 'Valve': H[0], V[0] = valve_end (H2, V2, V[0], 0, a, g, f, D, dt, KD, friction, dVdx2, dVdt2) elif utype[0] == 'Junction': elev = linkp.start_node.elevation H[0], V[0] = dead_end (linkp , H2, V2, elev, 0, a, g, f, D, dt, KD, friction, dVdx2, dVdt2) elif utype[0] == 'Pump': #source pump H[0], V[0] = source_pump(pump[0], linkp, H2, V2, dt, g, [-1], friction, dVdx2, dVdt2) # Pipe end (inner boundary conditions) V1 = V0[n-1]; H1 = H0[n-1] # upstream node V2 = V20; H2 = H20 # downstream nodes dVdx1 = dVdx[n-1] ; dVdx2 = dVdx20 dVdt1 = dVdt[n-1] ; dVdt2 = dVdt20 if dtype[0] == 'Pipe': if linkp.end_node.transient_node_type == 'SurgeTank': shape = linkp.end_node.tank_shape H[n], V[n], Qs = surge_tank(shape, linkp, link2, H1, V1, H2, V2, dt, g, n, [1], np.sign(links2), friction, dVdx1, dVdx2, dVdt1, dVdt2) linkp.end_node.water_level = H[n] linkp.end_node.tank_flow = Qs elif linkp.end_node.transient_node_type == 'Chamber': shape = linkp.end_node.tank_shape H[n], V[n], Qs, zp = air_chamber(shape, linkp, link2, H1, V1, H2, V2, dt, g, n, [1], np.sign(links2), friction, dVdx1, dVdx2, dVdt1, dVdt2) linkp.end_node.water_level = zp linkp.end_node.tank_flow = Qs else: elev = linkp.end_node.elevation emitter_coeff = linkp.end_node.emitter_coeff + linkp.end_node.demand_coeff block_per = linkp.end_node.block_per H[n], V[n] = add_leakage(emitter_coeff, block_per,linkp, link2, elev, H1, V1, H2, V2, dt, g, n, [1], np.sign(links2), friction, dVdx1, dVdx2, dVdt1, dVdt2) elif dtype[0] == 'Pump': pumpc = pump[1] H[n], V[n] = pump_node(pumpc, linkp, link2, H1, V1, H2, V2, dt, g, n, [1], np.sign(links2), friction, dVdx1, dVdx2, dVdt1, dVdt2) elif dtype[0] == 'Valve': valvec = valve[1] if links2 == []: H[n], V[n] = valve_end (H1, V1, V[n], n, a, g, f, D, dt, KD, friction, dVdx1, dVdt1) else: H[n], V[n] = valve_node(valvec, linkp, link2, H1, V1, H2, V2, dt, g, n, [1], np.sign(links2), friction, dVdx1, dVdx2, dVdt1, dVdt2) elif dtype[0] == 'Junction': elev = linkp.end_node.elevation H[n], V[n] = dead_end (linkp, H1, V1, elev, n, a, g, f, D, dt, KD, friction, dVdx1, dVdt1) return H, V
[docs]def right_boundary(linkp, pn, H0, V0, H, V, links1, p, pump, valve, dt, H10, V10, utype, dtype, friction, dVdt, dVdx, dVdt10, dVdx10): """MOC solution for an individual right boundary pipe. Parameters ---------- linkp : object Current pipe object pn : int Current pipe ID H : numpy.ndarray Head of current pipe at current time step [m] V : numpy.ndarray Velocity of current pipe at current time step [m/s] links1 : list Upstream adjacent pipes p : list pipe list pump : list Characteristics of the pump valve : list Characteristics of the valve n : int Number of discretization of current pipe dt : float Time step H0 : numpy.ndarray Head of current pipe at previous time step [m] V0 : numpy.ndarray Velocity of current pipe at previous time step [m/s] H10 : list Head of left adjacent nodes at previous time step [m] V10 : list Velocity of left adjacent nodes at previous time step [m/s] utype : list Upstream adjacent link type, and if not pipe, their name dtype : list Downstream adjacent link type, and if not pipe, their name friction: str friction model, e.g., 'steady', 'quasi-steady', 'unsteady', by default 'steady' dVdt: numpy.ndarray local instantaneous velocity approximation to be used for unsteady friction calculation, 0 if not in unsteady friction mode [m/s^2] dVdx: numpy.ndarray convective instantaneous velocity approximation to be used for unsteady friction calculation, 0 if not in unsteady friction mode [m/s^2] dVdt10 : list local instantaneous acceleration of left adjacent nodes at previous time step [m] dVdx10 : list convective instantaneous acceleration of left adjacent nodes at previous time step [m/s] Returns ------- H : numpy.ndarray Head results of the current pipe at current time step. [m] V : numpy.ndarray Velocity results of the current pipe at current time step. [m/s] """ # Properties of current pipe link1 = [p[abs(i)-1] for i in links1] f = linkp.roughness # unitless D = linkp.diameter # m g = 9.8 # m/s^2 a = linkp.wavev # m/s n = linkp.number_of_segments # spatial discretization KD = linkp.roughness_height # inner nodes if friction == 'steady': H[1:-1], V[1:-1] = inner_node_steady(linkp, H0, V0, dt, g) elif friction == 'quasi-steady': H[1:-1], V[1:-1] = inner_node_quasisteady(linkp, H0, V0, dt, g) else: H[1:-1], V[1:-1] = inner_node_unsteady(linkp, H0, V0, dt, g, dVdx, dVdt) # Pipe start (inner boundary conditions) V1 = V10; H1 = H10 # upstream node V2 = V0[1]; H2 = H0[1] # downstream node dVdx1 = dVdx10 ; dVdx2 = dVdx[0] dVdt1 = dVdt10 ; dVdt2 = dVdt[1] if utype[0] == 'Pipe': if linkp.start_node.transient_node_type == 'SurgeTank': shape = linkp.start_node.tank_shape H[0], V[0], Qs = surge_tank(shape, link1, linkp, H1, V1, H2, V2, dt, g, 0, np.sign(links1), [-1], friction, dVdx1, dVdx2, dVdt1, dVdt2) linkp.start_node.water_level = H[0] linkp.start_node.tank_flow = Qs if linkp.start_node.transient_node_type == 'Chamber': shape = linkp.start_node.tank_shape H[0], V[0], Qs, zp = air_chamber(shape, link1, linkp, H1, V1, H2, V2, dt, g, 0, np.sign(links1), [-1], friction, dVdx1, dVdx2, dVdt1, dVdt2) linkp.start_node.water_level = zp linkp.start_node.tank_flow = Qs else: elev = linkp.start_node.elevation emitter_coeff = linkp.start_node.emitter_coeff + linkp.start_node.demand_coeff block_per = linkp.start_node.block_per H[0], V[0] = add_leakage(emitter_coeff, block_per,link1, linkp, elev, H1, V1, H2, V2, dt, g, 0, np.sign(links1), [-1], friction, dVdx1, dVdx2, dVdt1, dVdt2) elif utype[0] == 'Pump': pumpc = pump[0] H[0], V[0] = pump_node(pumpc, link1, linkp, H1, V1, H2, V2, dt, g, 0, np.sign(links1), [-1], friction, dVdx1, dVdx2, dVdt1, dVdt2) elif utype[0] == 'Valve': valvec = valve[0] H[0], V[0] = valve_node(valvec, link1, linkp, H1, V1, H2, V2, dt, g, 0, np.sign(links1), [-1], friction, dVdx1, dVdx2, dVdt1, dVdt2) # Pipe end (outer boundary conditions ) V1 = V0[n-1]; H1 = H0[n-1] dVdx1 = dVdx[n-1] dVdt1 = dVdt[n-1] if dtype[0] == 'Reservoir' or dtype[0] == 'Tank': H[n], V[n] = rev_end (H1, V1, H[n], n, a, g, f, D, dt, KD, friction, dVdx1, dVdt1) if dtype[0] == 'Valve': H[n], V[n] = valve_end (H1, V1, V[n], n, a, g, f, D, dt, KD, friction, dVdx1, dVdt1) if dtype[0] == 'Junction': elev = linkp.end_node.elevation H[n], V[n] = dead_end (linkp ,H1, V1, elev, n, a, g, f, D, dt, KD, friction, dVdx1, dVdt1) return H, V