Skip to content

Commit a2b7641

Browse files
author
robin
committed
improve reading stream for pipeline
1 parent 01669cb commit a2b7641

File tree

4 files changed

+19
-17
lines changed

4 files changed

+19
-17
lines changed

docs/pipeline.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ The constructor of the pipeline module will recognize which method is fit for ru
2121
| n | The `n` is the rowsize of chunks for reading the filterbank as stream. |
2222
| size | The size parameter is used for deciding the size of the filterbank. |
2323

24-
After deciding which method to run for running the filterbank in a pipeline, it will measure the time it takes to run each method. At the end it will append the results to a txt file.
24+
After deciding which method to run for running the filterbank in a pipeline, it will measure the time it takes to run each method using `measure_method`. After running all the different methods, the constructor will append the results (a dictionary) to a txt file.
2525

2626
## 7.2 Read rows
2727

@@ -33,20 +33,23 @@ pipeline.Pipeline(<filterbank_file>, as_stream=True)
3333

3434
## 7.3 Read n rows
3535

36-
The `read_n_rows` method first splits all the filterbank data into chunks of n samples. After splitting the filterbank data in chunks, it will run the different modules of the pipeline for each chunk.
36+
The `read_n_rows` method first splits all the filterbank data into chunks of n samples. After splitting the filterbank data in chunks, it will run the different modules of the pipeline for each chunk. The remaining data, that which does not fit into the sample size, is currently ignored.
37+
38+
The `n` or sample size should be a power of 2 multiplied with the given scale for the downsampling.
3739

3840
```
3941
pipeline.Pipeline(<filterbank_file>, n=<size> , as_stream=True)
4042
```
4143

4244
## 7.4 Read static
4345

44-
The `read_static` method reads the entire filterbank at once. If the filterbank file is too large for this method, the alternative is using `read_n_rows`.
46+
The `read_static` method reads the entire filterbank at once, and applies each method to the entire dataset. If the filterbank file is too large for running it in-memory, the alternative is using `read_n_rows`.
4547

4648
```
4749
pipeline.Pipeline(<filterbank_file>)
4850
```
4951

5052
## 7.5 Measure methods
5153

52-
The `measure_methods` is ran for each of the above methods, and calculates the time it takes to run each of the different methods.
54+
The `measure_methods` is ran for each of the above methods, and calculates the time it takes to run each of the different methods. For each method it will create a key using the name of the method, and save the time it took to run the method as a value.
55+
At the end, it will returns a dictionary with all the keys and values.

examples/run_pipeline.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@
1111
# init filterbank filename
1212
fil_name = "./pspm.fil"
1313
# init filterbank sample size
14-
sample_size = 1534
14+
sample_size = 49152
1515
# init times the pipeline should run
16-
n_times = 1000
16+
n_times = 10
1717

1818
# run the filterbank n times
19-
for i in range(n):
19+
for i in range(n_times):
2020
# read static
2121
Pipeline(filename=fil_name, size=sample_size)
2222
# read stream, row per row
2323
Pipeline(filename=fil_name, as_stream=True)
2424
# read stream, n rows
25-
Pipeline(filename=fil_name, as_stream=True, n=10)
25+
Pipeline(filename=fil_name, as_stream=True, n=sample_size)

filterbank/filterbank.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ def __init__(self, filename, freq_range=None, time_range=None, read_all=False):
2525
"""
2626
if not os.path.isfile(filename):
2727
raise FileNotFoundError(filename)
28-
# iterator for stream
29-
self.stream_iter = 0
28+
# header values
3029
self.data, self.freqs, self.n_chans_selected = None, None, None
3130
self.filename = filename
3231
self.header = read_header(filename)
@@ -51,6 +50,8 @@ def __init__(self, filename, freq_range=None, time_range=None, read_all=False):
5150
self.fil.seek(int(self.ii_start * self.n_bytes * self.n_ifs * self.n_chans), 1)
5251
# find possible channels
5352
self.i_0, self.i_1 = self.setup_chans(freq_range)
53+
# number if stream iterations
54+
self.stream_iter = (self.n_samples * self.n_ifs)
5455
# read filterbank at once
5556
if read_all:
5657
self.read_filterbank()
@@ -84,8 +85,8 @@ def next_row(self):
8485
8586
returns False if EOF
8687
"""
87-
if self.stream_iter < (self.n_samples * self.n_ifs):
88-
self.stream_iter += 1
88+
if self.stream_iter > 0:
89+
self.stream_iter -= 1
8990
# skip bytes
9091
self.fil.seek(self.n_bytes * self.i_0, 1)
9192
# read row of data
@@ -104,11 +105,8 @@ def next_n_rows(self, n_rows):
104105
105106
returns False if EOF
106107
"""
107-
if self.stream_iter < (self.n_samples * self.n_ifs):
108-
# more rows requested than available
109-
if self.stream_iter + n_rows >= self.n_samples * self.n_ifs:
110-
n_rows = self.n_samples * self.n_ifs - self.stream_iter
111-
self.stream_iter += n_rows
108+
if self.stream_iter - n_rows > 0:
109+
self.stream_iter -= n_rows
112110
# init array of n rows
113111
data = np.zeros((n_rows, self.n_chans_selected), dtype=self.dd_type)
114112
for row in range(n_rows):

fourier/fourier.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ def fft_vectorized(input_data, nfft=None, axis=-1):
8080
input_data = zeroes
8181

8282
data_length = input_data.shape[0]
83+
print(data_length)
8384

8485
if np.log2(data_length) % 1 > 0:
8586
raise ValueError("Size of input data must be a power of 2")

0 commit comments

Comments
 (0)