"""
The tsnet.network.geometry read in the geometry defined by EPANet
.inp file, and assign additional parameters needed in transient
simulation later in tsnet.
"""
from __future__ import print_function
import wntr
from wntr.network.elements import LinkStatus
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import logging
import warnings
from wntr.network import WaterNetworkModel
from tsnet.network.discretize import (
discretization, max_time_step,
discretization_N, max_time_step_N)
from tsnet.network.control import (
valveclosing,
valveopening,
pumpclosing,
pumpopening,
burstsetting,
demandpulse
)
from tsnet.postprocessing import detect_cusum
logger = logging.getLogger(__name__)
[docs]class TransientModel (WaterNetworkModel):
""" Transient model class.
Parameters
-------------------
inp_file_name: string
Directory and filename of EPANET inp file to load into the
WaterNetworkModel object.
"""
def __init__ (self, inp_file):
super().__init__(inp_file)
self.simulation_timestamps = []
self.time_step = 0.
self.simulation_period = 0.
self.initial_velocity = []
self.initial_head = []
# assign ID to each links, start from 1.
i =1
for _, link in self.links():
link.id = i
i+=1
for _,valve in self.valves():
valve.valve_coeff = None
# assign ID to each links, start from 1.
i =1
for _, node in self.nodes():
node.id = i
node._leak_status = False
node.burst_status = False
node.blockage_status = False
node.pulse_status = False
node.emitter_coeff = 0.
node.block_per = 0.
node.transient_node_type = node.node_type
i+=1
# calculate the slope and area for each pipe
for _, pipe in self.pipes():
pipe.area = pipe.diameter**2. * np.pi / 4.
try:
theta = np.sin(np.arctan(pipe.end_node.elevation -
pipe.start_node.elevation)/pipe.length)
except:
theta = 0.0
pipe.theta = theta
# set operating default value as False
for _, link in self.links():
link.operating = False
[docs] def set_wavespeed(self, wavespeed=1200, pipes=None):
"""Set wave speed for pipes in the network
Parameters
----------
wavespeed : float or int or list, optional
If given as float or int, set the value as wavespeed
for all pipe; If given as list set the corresponding
value to each pipe, by default 1200.
pipes : str or list, optional
The list of pipe to define wavespeed,
by default all pipe in the network.
"""
generator = 0
if pipes == None :
generator = 1
pipes = self.pipes()
num_pipes = self.num_pipes
else:
pipes = [self.get_link(pipe) for pipe in list(pipes)]
num_pipes = len(pipes)
if isinstance(wavespeed,(float,int)):
# if wavespeed is a float, assign it to all pipes
wavespeed = wavespeed * np.ones(num_pipes)
elif isinstance(wavespeed, (list,tuple,np.ndarray)):
# if wavespeed is a list, assign each elements
# to the respective pipes.
if not len(wavespeed) == num_pipes:
raise ValueError('The length of the wavespeed \
input does not equal number of pipes. ')
else:
raise ValueError('Wavespeed should be a float or a list')
# assign wave speed to each pipes
i= 0
if generator == 1:
for _, pipe in pipes:
pipe.wavev = wavespeed[i]
i+=1
else:
for pipe in pipes:
pipe.wavev = wavespeed[i]
i+=1
[docs] def set_roughness(self,roughness, pipes=None):
"""Set roughness coefficient for pipes in the network
Parameters
----------
roughness : float or int or list
If given as float or int, set the value as roughness
for all pipe; If given as list set the corresponding
value to each pipe. Make sure to define it using the
same method (H-W or D-W) as defined in .inp file.
pipes : str or list, optional
The list of pipe to define roughness coefficient,
by default all pipe in the network.
"""
generator = 0
if pipes == None :
generator = 1
pipes = self.pipes()
num_pipes = self.num_pipes
else:
pipes = [self.get_link(pipe) for pipe in list(pipes)]
num_pipes = len(pipes)
if isinstance(roughness,(float,int)):
# if roughness is a float, assign it to all mentioned pipes
roughness = roughness * np.ones(num_pipes)
elif isinstance(roughness, (list,tuple,np.ndarray)):
# if roughness is a list, assign each elements
# to the respective pipes.
if not len(roughness) == num_pipes:
raise ValueError('The length of the roughness \
input does not equal number of input pipes. ')
else:
raise ValueError('Roughness should be a float or a list')
# assign roughness to each input pipes
i= 0
if generator == 1:
for _, pipe in pipes:
pipe.roughness = roughness[i]
i+=1
else:
for pipe in pipes:
pipe.roughness = roughness[i]
i+=1
[docs] def set_time(self, tf, dt=None):
"""Set time step and duration for the simulation.
Parameters
----------
tf : float
Simulation period
dt : float, optional
time step, by default maximum allowed dt
"""
if dt == None:
dt = max_time_step(self)
self.simulation_period = tf
self = discretization(self, dt)
print('Simulation time step %.5f s' % self.time_step)
[docs] def set_time_N(self, tf, N=2):
"""Set time step and duration for the simulation.
Parameters
----------
tf : float
Simulation period
N : integer
Number of segments in the critical pipe
"""
dt = max_time_step_N(self,N)
self.simulation_period = tf
self = discretization_N(self, dt)
print('Simulation time step %.5f s' % self.time_step)
[docs] def add_leak(self, name, coeff):
"""Add leak to the transient model
Parameters
----------
name : str, optional
The name of the leak nodes, by default None
coeff : list or float, optional
Emitter coefficient at the leak nodes, by default None
"""
leak_node = self.get_node(name)
leak_node.emitter_coeff += coeff
leak_node._leak_status = True
[docs] def add_burst(self, name, ts, tc, final_burst_coeff):
"""Add leak to the transient model
Parameters
----------
name : str
The name of the leak nodes, by default None
ts : float
Burst start time
tc : float
Time for burst to fully develop
final_burst_coeff : list or float
Final emitter coefficient at the burst nodes
"""
burst_node = self.get_node(name)
burst_node.burst_coeff = burstsetting(self.time_step, self.simulation_period,
ts, tc, final_burst_coeff)
burst_node.burst_status = True
[docs] def add_blockage(self, name, percentage):
"""Add blockage to the transient model
Parameters
----------
name : str
The name of the blockage nodes, by default None
percentage : list or float
The percentage of the blockage flow discharge
"""
blockage_node = self.get_node(name)
blockage_node.block_per = percentage
blockage_node.block_status = True
[docs] def valve_closure(self, name, rule, curve=None):
"""Set valve closure rule
Parameters
----------
name : str
The name of the valve to close
rule : list
Contains paramters to define valve operation rule
rule = [tc,ts,se,m]
tc : the duration takes to close the valve [s]
ts : closure start time [s]
se : final open percentage [s]
m : closure constant [unitless]
curve: list
[(open_percentage[i], 1/kl[i]) for i ]
List of open percentage and the corresponding
inverse of valve coefficient
"""
valve = self.get_link(name)
if valve.link_type.lower() != 'valve':
raise RuntimeError('The name of valve to operate is not associated with a vale')
if valve.status.name == 'Closed':
warnings.warn("Valve %s is already closed in its initial setting. \
The initial setting has been changed to open to perform the closure." %name)
valve.status = LinkStatus.Open
valve.operating = True
valve.operation_rule = valveclosing(self.time_step, self.simulation_period, rule)
if curve == None:
valve.valve_coeff = None
else:
p = [i for (i,j) in curve]
kl = [j for (i,j) in curve]
valve.valve_coeff = [p,kl]
[docs] def valve_opening(self, name, rule, curve=None):
"""Set valve opening rule
Parameters
----------
name : str
The name of the valve to close
rule : list
Contains paramters to define valve operation rule
rule = [tc,ts,se,m]
tc : the duration takes to open the valve [s]
ts : opening start time [s]
se : final open percentage [s]
m : closure constant [unitless]
curve: list
[(open_percentage[i], kl[i]) for i ]
List of open percentage and the corresponding
valve coefficient
"""
valve = self.get_link(name)
if valve.link_type.lower() != 'valve':
raise RuntimeError('The name of valve to operate is not associated with a vale')
if valve.initial_status.name == 'Open' or valve.initial_status.name == 'Active':
warnings.warn("Valve %s is already open in its initial setting. \
The initial setting has been changed to closed to perform the opening." %name)
valve.status = LinkStatus.Closed
valve.operating = True
valve.operation_rule = valveopening(self.time_step, self.simulation_period, rule)
if curve == None:
valve.valve_coeff = None
else:
p = [i for (i,j) in curve]
kl = [j for (i,j) in curve]
valve.valve_coeff = [p,kl]
[docs] def pump_shut_off(self, name, rule):
"""Set pump shut off rule
Parameters
----------
name : str
The name of the pump to shut off
rule : list
Contains paramaters to define valve operation rule
rule = [tc,ts,se,m]
tc : the duration takes to close the pump [s]
ts : closure start time [s]
se : final open percentage [s]
m : closure constant [unitless]
"""
pump = self.get_link(name)
if pump.link_type.lower() != 'pump':
raise RuntimeError('The name of pump to operate is not associated with a pump')
if pump.initial_status.name == 'Closed':
warnings.warn("Pump %s is already closed in its initial setting. \
The initial setting has been changed to open to perform the closure." %name)
pump.status= LinkStatus.Open
pump.operating = True
pump.operation_rule = pumpclosing(self.time_step, self.simulation_period, rule)
[docs] def pump_start_up(self, name, rule):
"""Set pump start up rule
Parameters
----------
name : str
The name of the pump to shut off
rule : list
Contains paramaters to define valve operation rule
rule = [tc,ts,se,m]
tc : the duration takes to close the valve [s]
ts : closure start time [s]
se : final open percentage [s]
m : closure constant [unitless]
"""
pump = self.get_link(name)
if pump.link_type.lower() != 'pump':
raise RuntimeError('The name of pump to operate is not associated with a pump')
# Turn the pump on and run initial calculation
# to get the nominal flow and head
pump.status = LinkStatus.Open
sim = wntr.sim.EpanetSimulator(self)
results = sim.run_sim()
pump.nominal_flow = results.link['flowrate'].loc[0,name]
node1 = self.links[name].start_node.name
node2 = self.links[name].end_node.name
pump.nominal_pump_head = abs(results.node['head'].loc[0,node1]-
results.node['head'].loc[0,node2])
# Turn the pump back to closed
pump.status = LinkStatus.Closed
pump.operating = True
pump.operation_rule = pumpopening(self.time_step, self.simulation_period, rule)
[docs] def add_demand_pulse(self, name, rule):
""" Add demand pulse to junction
Parameters
----------
name : str or list
The name of junctions to add demand pulse
rule : list
Contains paramters to define valve operation rule
rule = [tc,ts,stay,dp,m]
tc : total duration of the pulse [s] [s]
ts : start time of demand [s]
stay: duration of the demand to stay at peak level [s]
dp : demand pulse multiplier [uniteless]
"""
[tc, ts, tp, dp] = rule
demand_node = self.get_node(name)
demand_node.pulse_coeff = demandpulse(self.time_step, self.simulation_period,
tc, ts, tp, dp)
demand_node.pulse_status = True
[docs] def add_surge_tank(self, name, shape, tank_type='open'):
""" Add surge tank
Parameters
----------
name : str
the name of the node to add a surge tank
shape : list
if closed: [As, Ht, Hs]
As : cross-sectional area of the surge tank
Ht : tank height
Hs : initial water height in the surge tank
if open: [As]
tank_type : int
type of the surge tank, "closed" or "open",
by default 'open'
"""
surge_node = self.get_node(name)
surge_node.tank_flow = 0
shape.append(0.)
if tank_type == 'open':
surge_node.transient_node_type = 'SurgeTank'
elif tank_type == 'closed':
surge_node.transient_node_type = 'Chamber'
surge_node.tank_height = shape[1]
surge_node.water_level = shape[-2]
else :
print ("tank type can be 'closed' or 'open'.")
surge_node.tank_shape = shape # append tank flow
[docs] def detect_pressure_change(self, name, threshold, drift, show=False, ax=None):
"""Detect pressure change in simulation results
Parameters
----------
name : str
The name of the node
threshold : positive number, optional (default = 1)
amplitude threshold for the change in the data.
drift : positive number, optional (default = 0)
drift term that prevents any change in the absence of change.
show : bool, optional (default = True)
True (1) plots data in matplotlib figure, False (0) don't plot.
ax : a matplotlib.axes.Axes instance, optional (default = None).
"""
time = self.simulation_timestamps
x = self.get_node(name)._head
ta, tf, amp = detect_cusum(time, x, threshold, drift,
show, ax=None)
ta = [time[i] for i in ta]
tf = [time[i] for i in tf]
print ('%s changes detected in pressure results on node %s' %(len(ta), name))
return ta, tf, list(amp)
[docs] def plot_node_head(self, name, ax=None):
"""Detect pressure change in simulation results
Parameters
----------
name : str or list
The name of node
ax : a matplotlib.axes.Axes instance, optional (default = None).
"""
try:
import matplotlib.pyplot as plt
except ImportError:
print('matplotlib is not available.')
else:
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=(8,4),dpi=100, facecolor='w', edgecolor='k')
if not type(name) is list:
name = [name]
nodes = [self.get_node(i) for i in name]
time = self.simulation_timestamps
for i,node in enumerate(nodes):
ax.plot(time, node._head, lw=2, label=name[i])
plt.xlim([self.simulation_timestamps[0],self.simulation_timestamps[-1]])
# plt.title('Pressure Head at Node(s) ')
plt.xlabel("Time [s]", fontsize=14)
plt.ylabel("Pressure Head [m]", fontsize=14)
plt.legend(loc='best', framealpha=.5, numpoints=1)
plt.grid(False)
plt.show()