Skip to content

Commit 46a5aea

Browse files
authored
Merge pull request #41 from AUAS-Pulsar/pipeline
Pipeline module
2 parents d27d706 + f2371b0 commit 46a5aea

File tree

15 files changed

+314
-29
lines changed

15 files changed

+314
-29
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,9 @@ Creating a free and open source framework that contains the generic algorithms a
4545
2. [Filter samples](docs/clipping.md#62-filter-samples)
4646
3. [Filter channels](docs/clipping.md#63-filter-channels)
4747
4. [Filter individual channels](docs/clipping.md#64-filter-individual-channels)
48+
7. [Pipeline](docs/pipeline.md)
49+
1. [Introduction](docs/pipeline.md#71-introduction)
50+
2. [Read rows](docs/pipeline.md#72-read-rows)
51+
3. [Read n rows](docs/pipeline.md#73-read-n-rows)
52+
3. [Read static](docs/pipeline.md#74-read-static)
53+
4. [Measure methods](docs/pipeline.md#75-measure-methods)

clipping/clipping.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,21 @@ def clipping(channels, samples):
1313
# remove all rows(samples) with noise
1414
samples = filter_samples(samples)
1515
# remove all columns(channels) with noise, select first max n samples
16-
channels, samples = filter_channels(channels, samples[:n_samples])
16+
bad_channels = filter_channels(samples[:n_samples])
17+
# remove bad channels from samples
18+
channels = np.delete(channels, bad_channels)
19+
samples = np.delete(samples, bad_channels, axis=1)
1720
# remove all individual cells with noise
1821
samples = filter_indv_channels(samples)
1922
return channels, samples
2023

2124

22-
def filter_samples(samples):
25+
def filter_samples(samples, factor=11):
2326
"""
2427
Calulate mean power of all frequencies per time sample
2528
and remove samples with significantly high power
2629
"""
27-
factor = 1.3
28-
new_samples = []
30+
new_samples = list()
2931
# calculate mean intensity per sample
3032
avg_sample = np.sum(samples)/len(samples)
3133
# remove samples with significant high power
@@ -35,13 +37,12 @@ def filter_samples(samples):
3537
return np.array(new_samples)
3638

3739

38-
def filter_channels(channels, samples):
40+
def filter_channels(samples, factor=9):
3941
"""
4042
Calculate mean power of all time samples per frequency
4143
and remove frequencies with significantly high power
4244
"""
43-
factor = 1.3
44-
bad_channels = []
45+
bad_channels = list()
4546
# calculate the mean power per channel
4647
avg_power_chan = samples.mean(axis=0)
4748
# calculate the standard deviation per channel
@@ -52,18 +53,14 @@ def filter_channels(channels, samples):
5253
for i, (avg_channel, sd_channel) in enumerate(zip(avg_power_chan, sd_power_chan)):
5354
if avg_channel >= (avg_power * factor) or sd_channel >= (avg_power * factor):
5455
bad_channels.append(i)
55-
# remove bad channels from samples
56-
new_channels = np.delete(channels, bad_channels)
57-
new_samples = np.delete(samples, bad_channels, axis=1)
58-
return new_channels, new_samples
56+
return bad_channels
5957

6058

61-
def filter_indv_channels(samples):
59+
def filter_indv_channels(samples, factor=9):
6260
"""
6361
Calculate mean power per frequency
6462
and remove samples with significantly high power
6563
"""
66-
factor = 1.3
6764
new_samples = np.zeros((len(samples), len(samples[0])))
6865
# calculate the mean power for each sample per channel
6966
avg_power_chan = samples.mean(axis=0)

docs/pipeline.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# 7. Pipeline
2+
3+
## 7.1 Introduction
4+
5+
The Pipeline module is used to execute the different modules in a specific order.
6+
There are currently three different options for running the pipeline.
7+
8+
These options include:
9+
* read multiple rows, `read_n_rows`
10+
* read single rows, `read_rows`
11+
* read all rows, `read_static`
12+
13+
The constructor of the pipeline module will recognize which method is fit for running which method, by looking at the given arguments to the constructor.
14+
15+
| Parameter | Description |
16+
| --- | --- |
17+
| filename | The path to the filterbank file. |
18+
| as_stream | This parameter decides whether the filterbank should be read as stream. |
19+
| DM | The dispersion measure (DM) is used for performing dedispersion. |
20+
| scale | The scale is used for performing downsampling the time series. |
21+
| n | The `n` is the rowsize of chunks for reading the filterbank as stream. |
22+
| size | The size parameter is used for deciding the size of the filterbank. |
23+
24+
After deciding which method to run for running the filterbank in a pipeline, it will measure the time it takes to run each method using `measure_method`. After running all the different methods, the constructor will append the results (a dictionary) to a txt file.
25+
26+
## 7.2 Read rows
27+
28+
The `read_rows` method reads the Filterbank data row per row. Because it only reads the filterbank per row, it is unable to execute most methods. The alternative for this method is the `read_n_rows` method, which is able to run all methods.
29+
30+
```
31+
pipeline.Pipeline(<filterbank_file>, as_stream=True)
32+
```
33+
34+
## 7.3 Read n rows
35+
36+
The `read_n_rows` method first splits all the filterbank data into chunks of n samples. After splitting the filterbank data in chunks, it will run the different modules of the pipeline for each chunk. The remaining data, that which does not fit into the sample size, is currently ignored.
37+
38+
The `n` or sample size should be a power of 2 multiplied with the given scale for the downsampling.
39+
40+
```
41+
pipeline.Pipeline(<filterbank_file>, n=<size> , as_stream=True)
42+
```
43+
44+
## 7.4 Read static
45+
46+
The `read_static` method reads the entire filterbank at once, and applies each method to the entire dataset. If the filterbank file is too large for running it in-memory, the alternative is using `read_n_rows`.
47+
48+
```
49+
pipeline.Pipeline(<filterbank_file>)
50+
```
51+
52+
## 7.5 Measure methods
53+
54+
The `measure_methods` is ran for each of the above methods, and calculates the time it takes to run each of the different methods. For each method it will create a key using the name of the method, and save the time it took to run the method as a value.
55+
At the end, it will returns a dictionary with all the keys and values.

examples/run_pipeline.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
"""
2+
Script for running the pipeline
3+
"""
4+
#pylint: disable-all
5+
import os,sys,inspect
6+
CURRENT_DIR = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
7+
PARENT_DIR = os.path.dirname(CURRENT_DIR)
8+
sys.path.insert(0, PARENT_DIR)
9+
from pipeline.pipeline import Pipeline
10+
11+
# init filterbank filename
12+
fil_name = os.path.abspath("filterbank.fil")
13+
# init filterbank sample size
14+
sample_size = 49152
15+
# init times the pipeline should run
16+
n_times = 10
17+
18+
# run the filterbank n times
19+
for i in range(n_times):
20+
# read static
21+
Pipeline(filename=fil_name, size=sample_size)
22+
# read stream, row per row
23+
Pipeline(filename=fil_name, as_stream=True)
24+
# read stream, n rows
25+
Pipeline(filename=fil_name, as_stream=True, n=sample_size)

filterbank/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
"""
2+
Export filterbank methods
3+
"""
4+
from .filterbank import Filterbank
5+
from .filterbank import read_header

filterbank/filterbank.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ def __init__(self, filename, freq_range=None, time_range=None, read_all=False):
2525
"""
2626
if not os.path.isfile(filename):
2727
raise FileNotFoundError(filename)
28-
# iterator for stream
29-
self.stream_iter = 0
28+
# header values
3029
self.data, self.freqs, self.n_chans_selected = None, None, None
3130
self.filename = filename
3231
self.header = read_header(filename)
@@ -51,6 +50,8 @@ def __init__(self, filename, freq_range=None, time_range=None, read_all=False):
5150
self.fil.seek(int(self.ii_start * self.n_bytes * self.n_ifs * self.n_chans), 1)
5251
# find possible channels
5352
self.i_0, self.i_1 = self.setup_chans(freq_range)
53+
# number if stream iterations
54+
self.stream_iter = (self.n_samples * self.n_ifs)
5455
# read filterbank at once
5556
if read_all:
5657
self.read_filterbank()
@@ -84,8 +85,8 @@ def next_row(self):
8485
8586
returns False if EOF
8687
"""
87-
if self.stream_iter < (self.n_samples * self.n_ifs):
88-
self.stream_iter += 1
88+
if self.stream_iter > 0:
89+
self.stream_iter -= 1
8990
# skip bytes
9091
self.fil.seek(self.n_bytes * self.i_0, 1)
9192
# read row of data
@@ -104,11 +105,8 @@ def next_n_rows(self, n_rows):
104105
105106
returns False if EOF
106107
"""
107-
if self.stream_iter < (self.n_samples * self.n_ifs):
108-
# more rows requested than available
109-
if self.stream_iter + n_rows >= self.n_samples * self.n_ifs:
110-
n_rows = self.n_samples * self.n_ifs - self.stream_iter
111-
self.stream_iter += n_rows
108+
if self.stream_iter - n_rows > 0:
109+
self.stream_iter -= n_rows
112110
# init array of n rows
113111
data = np.zeros((n_rows, self.n_chans_selected), dtype=self.dd_type)
114112
for row in range(n_rows):

fourier/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
"""
22
import file for fourier.py
33
"""
4-
from .fourier import fft_freq, fft_vectorized, dft_slow, fft_matrix, fft_shift
4+
from .fourier import fft_freq, fft_vectorized, dft_slow, fft_matrix, fft_shift, ifft

pipeline/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
"""
2+
Export pipeline methods
3+
"""
4+
from .pipeline import Pipeline

pipeline/pipeline.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
"""
2+
Pipeline for running all the modules in order
3+
"""
4+
# pylint: disable=wrong-import-position
5+
import os
6+
import sys
7+
import inspect
8+
CURRENT_DIR = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
9+
PARENT_DIR = os.path.dirname(CURRENT_DIR)
10+
sys.path.insert(0, PARENT_DIR)
11+
from timeit import default_timer as timer
12+
import filterbank.filterbank
13+
import timeseries.timeseries
14+
import clipping
15+
import dedisperse
16+
import fourier
17+
18+
# pylint: disable=too-many-locals
19+
# pylint: disable=too-many-arguments
20+
# pylint: disable=invalid-name
21+
# pylint: disable=no-self-use
22+
23+
class Pipeline:
24+
"""
25+
The Pipeline combines the functionality of all modules
26+
in the library.
27+
"""
28+
29+
def __init__(self, filename=None, as_stream=False, DM=230, scale=3, n=None, size=None):
30+
"""
31+
Initialize Pipeline object
32+
33+
Args:
34+
as_stream, read the filterbank data as stream
35+
"""
36+
if as_stream:
37+
if n:
38+
result = self.read_n_rows(n, filename, DM, scale)
39+
file = open("n_rows_filterbank.txt", "a+")
40+
else:
41+
result = self.read_rows(filename)
42+
file = open("rows_filterbank.txt", "a+")
43+
else:
44+
result = self.read_static(filename, DM, scale, size)
45+
file = open("static_filterbank.txt", "a+")
46+
file.write(str(result) + ",")
47+
file.close()
48+
49+
50+
def read_rows(self, filename):
51+
"""
52+
Read the filterbank data as stream
53+
and measure the time
54+
"""
55+
# init filterbank as stream
56+
fil = filterbank.Filterbank(filename)
57+
time_start = timer()
58+
while True:
59+
fil_data = fil.next_row()
60+
if isinstance(fil_data, bool):
61+
break
62+
time_stop = timer() - time_start
63+
return time_stop
64+
65+
66+
def read_n_rows(self, n, filename, DM, scale):
67+
"""
68+
Read the filterbank data as stream
69+
and measure the time
70+
"""
71+
fil = filterbank.Filterbank(filename)
72+
stopwatch_list = list()
73+
while True:
74+
stopwatch = dict.fromkeys(['time_read', 'time_select', 'time_clipping', 'time_dedisp',
75+
'time_t_series', 'time_downsample', 'time_fft_vect',
76+
'time_dft', 'time_ifft', 'time_fft_freq'])
77+
time_start = timer()
78+
fil_data = fil.next_n_rows(n)
79+
# break if EOF
80+
if isinstance(fil_data, bool):
81+
break
82+
stopwatch['time_read'] = timer() - time_start
83+
# run methods
84+
stopwatch = self.measure_methods(stopwatch, fil_data, fil.freqs, DM, scale)
85+
stopwatch_list.append(stopwatch)
86+
return stopwatch_list
87+
88+
89+
def read_static(self, filename, DM, scale, size):
90+
"""
91+
Read the filterbank data at once
92+
and measure the time per function/class
93+
"""
94+
stopwatch = dict.fromkeys(['time_read', 'time_select', 'time_clipping', 'time_dedisp',
95+
'time_t_series', 'time_downsample', 'time_fft_vect', 'time_dft',
96+
'time_ifft', 'time_fft_freq'])
97+
time_start = timer()
98+
# init filterbank
99+
fil = filterbank.Filterbank(filename, read_all=True, time_range=(0, size))
100+
stopwatch['time_read'] = timer() - time_start
101+
# select data
102+
time_select = timer()
103+
freqs, fil_data = fil.select_data()
104+
stopwatch['time_select'] = timer() - time_select
105+
# run methods
106+
stopwatch = self.measure_methods(stopwatch, fil_data, freqs, DM, scale)
107+
return stopwatch
108+
109+
110+
def measure_methods(self, stopwatch, fil_data, freqs, DM, scale):
111+
"""
112+
Run and time all methods/modules
113+
"""
114+
# clipping
115+
time_clipping = timer()
116+
_, _ = clipping.clipping(freqs, fil_data)
117+
stopwatch['time_clipping'] = timer() - time_clipping
118+
# dedisperse
119+
time_dedisp = timer()
120+
fil_data = dedisperse.dedisperse(fil_data, DM)
121+
stopwatch['time_dedisp'] = timer() - time_dedisp
122+
# timeseries
123+
time_t_series = timer()
124+
time_series = timeseries.Timeseries(fil_data)
125+
stopwatch['time_t_series'] = timer() - time_t_series
126+
# downsample
127+
time_downsamp = timer()
128+
time_series = time_series.downsample(scale)
129+
stopwatch['time_downsample'] = timer() - time_downsamp
130+
# fft vect
131+
time_fft_vect = timer()
132+
fourier.fft_vectorized(time_series)
133+
stopwatch['time_fft_vect'] = timer() - time_fft_vect
134+
# dft
135+
time_dft = timer()
136+
fourier.dft_slow(time_series)
137+
stopwatch['time_dft'] = timer() - time_dft
138+
# ifft
139+
time_ifft = timer()
140+
fourier.ifft(time_series)
141+
stopwatch['time_ifft'] = timer() - time_ifft
142+
# fft freq
143+
time_fft_freq = timer()
144+
fourier.fft_freq(10)
145+
stopwatch['time_fft_freq'] = timer() - time_fft_freq
146+
return stopwatch

plot/waterfall.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ def update_plot_labels(self):
8989
else:
9090
freq_range = ((center_freq - sample_freq/2)/1e6,\
9191
(center_freq + sample_freq*(self.scans_per_sweep - 0.5))/1e6)
92-
print(self.image)
9392
self.image.set_extent(freq_range + (0, 1))
9493
self.fig.canvas.draw_idle()
9594

0 commit comments

Comments
 (0)