Source code for tsnet.simulation.initialize

"""
The tsnet.simulation.initialize contains functions to
1. Initialize the list containing numpy arrays for velocity and head.
2. Calculate initial conditions using Epanet engine.
3. Calculate D-W coefficients based on initial conditions.
4. Calculate demand coefficients based on initial conditions.

"""

import wntr
import numpy as np
import warnings
from tsnet.utils import calc_parabola_vertex

[docs]def Initializer(tm, t0, engine='DD'): """Initial Condition Calculation. Initialize the list containing numpy arrays for velocity and head. Calculate initial conditions using Epanet engine. Calculate D-W coefficients based on initial conditions. Calculate demand coefficients based on initial conditions. Parameters ---------- tm : tsnet.network.geometry.TransientModel Simulated network t0 : float time to calculate initial condition engine : string steady state calculation engine: DD: demand driven; PDD: pressure dependent demand, by default DD Returns ------- tm : tsnet.network.geometry.TransientModel Network with updated parameters """ # adjust the time step and discretize each pipe tn = int(tm.simulation_period/tm.time_step) # Total time steps print ('Total Time Step in this simulation %s' %tn) # create new attributes for each pipe to store head and velocity results # at its start and end node. for _, pipe in tm.pipes(): pipe.start_node_head = np.zeros(tn) pipe.start_node_velocity = np.zeros(tn) pipe.start_node_flowrate = np.zeros(tn) pipe.end_node_head = np.zeros(tn) pipe.end_node_velocity = np.zeros(tn) pipe.end_node_flowrate = np.zeros(tn) # create new attributes for each node to store head and discharge results for _,node in tm.nodes(): node.demand_discharge = np.zeros(tn) node.emitter_discharge = np.zeros(tn) # calculate initial conditions using EPAnet engine for _,node in tm.nodes(): if node._leak_status == True: node.add_leak(tm, area=node.emitter_coeff/np.sqrt(2*9.81), discharge_coeff = 1, start_time = t0) if engine.lower() == 'dd': tm.options.hydraulic.demand_model = 'DD' sim = wntr.sim.EpanetSimulator(tm) results = sim.run_sim() elif engine.lower() == 'pdd': tm.options.hydraulic.demand_model = 'PDD' sim = wntr.sim.EpanetSimulator(tm) results = sim.run_sim() else: raise Exception("Unknown initial calculation engine. \ The engine can only be 'DD' or 'PDD'.") for _,node in tm.nodes(): node.initial_head = results.node['head'].loc[t0, node.name] for _,link in tm.links(): link.initial_flow = results.link['flowrate'].loc[t0, link.name] nu = 1.004e-6 for _, pipe in tm.pipes(): # assign the initial conditions to the latest result arrays V = np.sign(results.link['flowrate'].loc[t0, pipe.name])*\ results.link['velocity'].loc[t0, pipe.name]*\ np.ones(pipe.number_of_segments+1) H = [results.node['head'].loc[t0, pipe.start_node_name] +\ i* ((results.node['head'].loc[t0, pipe.end_node_name]- results.node['head'].loc[t0, pipe.start_node_name])/ (pipe.number_of_segments)) for i in range(pipe.number_of_segments+1)] H = np.array(H) pipe.initial_head = H pipe.initial_velocity = V pipe.initial_Re = np.abs(V[0]*pipe.diameter/nu) # assign the initial conditions to the results attributes pipe.start_node_velocity[0] = V[0] pipe.end_node_velocity[0] = V[-1] pipe.start_node_head[0] = H[0] pipe.end_node_head[0] = H[-1] pipe.start_node_flowrate[0] = V[0]*pipe.area pipe.end_node_flowrate[0] = V[-1]*pipe.area # calculate demand coefficient Hs = H[0] He = H[-1] demand = [0,0] # demand at start and end node try : demand[0] = results.node['demand'].loc[t0, pipe.start_node_name] # demand[0] = tm.nodes[pipe.start_node_name].demand_timeseries_list.at(t0) except: demand[0] = 0. try : demand[1] = results.node['demand'].loc[t0, pipe.end_node_name] # demand[1] = tm.nodes[pipe.end_node_name].demand_timeseries_list.at(t0) except : demand[1] = 0. try : Hsa = H[0] -pipe.start_node.elevation except: Hsa = 1. try : Hea = H[-1]- pipe.end_node.elevation except : Hea=1. pipe = cal_demand_coef(demand, pipe, Hsa, Hea, t0) # calculate demnad discharge and emitter discharge if pipe.start_node.node_type == 'Junction': pipe.start_node.emitter_discharge[0] = pipe.start_node.emitter_coeff * np.sqrt(Hsa) pipe.start_node.demand_discharge[0] = pipe.start_node.demand_coeff * np.sqrt(Hsa) if pipe.end_node.node_type == 'Junction': pipe.end_node.emitter_discharge[0] = pipe.end_node.emitter_coeff * np.sqrt(Hea) pipe.end_node.demand_discharge[0] = pipe.end_node.demand_coeff * np.sqrt(Hea) # calculate roughness coefficient Vp = V[0] hl = abs(Hs - He) pipe = cal_roughness_coef(pipe, Vp, hl ) # set initial conditions as a new attribute to TransientModel tm.initial_head = H tm.initial_velocity = V # add pump operation points tm = pump_operation_points(tm) return tm
[docs]def cal_demand_coef(demand, pipe, Hs, He, t0=0.): """Calculate the demand coefficient for the start and end node of the pipe. Parameters ---------- demand : list Demand at the start (demand[0]) and end demand[1] node pipe : object Pipe object Hs : float Head at the start node He : float Head at the end node t0 : float, optional Time to start initial condition calculation, by default 0 Returns ------- pipe : object Pipe object with calculated demand coefficient """ try: start_demand_coeff = demand[0]/ np.sqrt(Hs) except : start_demand_coeff = 0. try: end_demand_coeff = demand[1] / np.sqrt(He) except : end_demand_coeff = 0. pipe.start_node.demand_coeff = start_demand_coeff # [m^3/s/(m H20)^(1/2)] pipe.end_node.demand_coeff = end_demand_coeff # [m^3/s/(m H20)^(1/2)] return pipe
[docs]def cal_roughness_coef(pipe, V, hl): """Calculate the D-W roughness coefficient based on initial conditions. Parameters ---------- pipe : object Pipe object V : float Initial flow velocity in the pipe hl : float Initial head loss in the pipe Returns ------- pipe : object Pipe object with calculated D-W roughness coefficient. """ g = 9.8 H_tol = 1e-3 V_tol = 1e-5 if abs(V) >= V_tol and hl >= H_tol: pipe.roughness = hl / (pipe.length/pipe.diameter) / (V**2/2/g) else: pipe.roughness = 0 if pipe.roughness >0.08: warnings.warn("%s :the friction coefficient %.4f is too large. \ The D-W coeff has been set to 0.03 " %(pipe.name, pipe.roughness)) pipe.roughness = 0.03 if pipe.roughness!= 0: pipe.roughness_height = max(10**(-1/1.8/np.sqrt(pipe.roughness)) - 6.9/pipe.initial_Re, 0) else: pipe.roughness_height = 0 return pipe
[docs]def pump_operation_points(tm): #add operation points to the pump for _, pump in tm.pumps(): opt_point = (pump.initial_flow, abs(pump.end_node.initial_head-pump.start_node.initial_head)) def_points = pump.get_pump_curve().points # single-point pump curve if len(def_points) == 1: (flow, head) = def_points[0] def_points.append((0., 1.33*head)) def_points.append((2*flow, 0.)) elif len(def_points) != 3: raise Exception("TSNet only support one-point or three-point pump curve.") dist = [] for (i,j) in def_points: dist.append(np.sqrt((i - opt_point[0])**2 + (j - opt_point[1])**2)) pump.get_pump_curve().points.remove(def_points[dist.index(min(dist))]) pump.get_pump_curve().points.append(opt_point) pump.curve_coef = calc_parabola_vertex(pump.get_pump_curve().points) return tm