Skip to content

Commit deb7c5a

Browse files
feat: add Deep Belief Network (DBN) using RBMs in pure NumPy
Implement a multi-layer DBN constructed by stacking Restricted Boltzmann Machines trained with contrastive divergence. The implementation uses Gibbs sampling for binary units and manual weight updates with NumPy, without external deep learning frameworks. Includes layer-wise pretraining, a reconstruction method, and visualization of original vs reconstructed samples. This code serves as an educational and foundational contribution for unsupervised feature learning and can be extended for fine-tuning deep neural networks.
1 parent d88d269 commit deb7c5a

File tree

1 file changed

+322
-0
lines changed

1 file changed

+322
-0
lines changed
Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
"""
2+
- - - - - -- - - - - - - - - - - - - - - - - - - - - - -
3+
Name - - Deep Belief Network (DBN) Using Restricted Boltzmann Machines (RBMs)
4+
Goal - - Unsupervised layer-wise feature learning and pretraining for deep neural networks
5+
Detail: Multi-layer DBN constructed by stacking RBMs trained via contrastive divergence.
6+
Implements Gibbs sampling for binary units, manual weight updates with NumPy.
7+
Developed for Intrusion Detection System (IDS) in WiFi networks.
8+
This implementation is written entirely in pure NumPy, with no deep learning frameworks.
9+
Can be extended for fine-tuning deep neural networks.
10+
11+
Author: Adhithya Laxman Ravi Shankar Geetha
12+
GitHub: https://github.com/Adhithya-Laxman/
13+
Date: 2025.10.21
14+
- - - - - -- - - - - - - - - - - - - - - - - - - - - - -
15+
"""
16+
17+
import numpy as np
18+
import matplotlib.pyplot as plt
19+
20+
21+
class RBM:
22+
def __init__(self, n_visible, n_hidden, learning_rate=0.01, k=1, epochs=10, batch_size=64, mode='bernoulli'):
23+
"""
24+
Initialize an RBM.
25+
26+
Args:
27+
n_visible (int): Number of visible units.
28+
n_hidden (int): Number of hidden units.
29+
learning_rate (float): Learning rate for weight updates.
30+
k (int): Number of Gibbs sampling steps.
31+
epochs (int): Number of training epochs.
32+
batch_size (int): Batch size.
33+
mode (str): Sampling mode ('bernoulli' or 'gaussian').
34+
"""
35+
self.n_visible = n_visible
36+
self.n_hidden = n_hidden
37+
self.learning_rate = learning_rate
38+
self.k = k
39+
self.epochs = epochs
40+
self.batch_size = batch_size
41+
self.mode = mode
42+
43+
# Initialize weights and biases
44+
self.weights = np.random.normal(0, 0.01, (n_visible, n_hidden))
45+
self.hidden_bias = np.zeros(n_hidden)
46+
self.visible_bias = np.zeros(n_visible)
47+
48+
def sigmoid(self, x):
49+
"""
50+
Compute the sigmoid activation function.
51+
52+
Args:
53+
x (np.ndarray): Input array.
54+
55+
Returns:
56+
np.ndarray: Sigmoid of input.
57+
"""
58+
return 1.0 / (1.0 + np.exp(-x))
59+
60+
def sample_prob(self, probs):
61+
"""
62+
Sample binary states from given probabilities.
63+
64+
Args:
65+
probs (np.ndarray): Probabilities of activation.
66+
67+
Returns:
68+
np.ndarray: Sampled binary values.
69+
"""
70+
return (np.random.rand(*probs.shape) < probs).astype(float)
71+
72+
def sample_hidden_given_visible(self, v):
73+
"""
74+
Sample hidden units conditioned on visible units.
75+
76+
Args:
77+
v (np.ndarray): Visible units.
78+
79+
Returns:
80+
tuple: (hidden probabilities, hidden samples)
81+
"""
82+
hid_probs = self.sigmoid(np.dot(v, self.weights) + self.hidden_bias)
83+
hid_samples = self.sample_prob(hid_probs)
84+
return hid_probs, hid_samples
85+
86+
def sample_visible_given_hidden(self, h):
87+
"""
88+
Sample visible units conditioned on hidden units.
89+
90+
Args:
91+
h (np.ndarray): Hidden units.
92+
93+
Returns:
94+
tuple: (visible probabilities, visible samples)
95+
"""
96+
vis_probs = self.sigmoid(np.dot(h, self.weights.T) + self.visible_bias)
97+
vis_samples = self.sample_prob(vis_probs)
98+
return vis_probs, vis_samples
99+
100+
def contrastive_divergence(self, v0):
101+
"""
102+
Perform Contrastive Divergence (CD-k) step.
103+
104+
Args:
105+
v0 (np.ndarray): Initial visible units (data batch).
106+
107+
Returns:
108+
float: Reconstruction loss for the batch.
109+
"""
110+
h_probs0, h0 = self.sample_hidden_given_visible(v0)
111+
vk, hk = v0, h0
112+
113+
for _ in range(self.k):
114+
v_probs, vk = self.sample_visible_given_hidden(hk)
115+
h_probs, hk = self.sample_hidden_given_visible(vk)
116+
117+
# Compute gradients
118+
positive_grad = np.dot(v0.T, h_probs0)
119+
negative_grad = np.dot(vk.T, h_probs)
120+
121+
# Update weights and biases
122+
self.weights += self.learning_rate * (positive_grad - negative_grad) / v0.shape[0]
123+
self.visible_bias += self.learning_rate * np.mean(v0 - vk, axis=0)
124+
self.hidden_bias += self.learning_rate * np.mean(h_probs0 - h_probs, axis=0)
125+
126+
loss = np.mean((v0 - vk) ** 2)
127+
return loss
128+
129+
def train(self, data):
130+
"""
131+
Train the RBM on given data.
132+
133+
Args:
134+
data (np.ndarray): Training data matrix.
135+
"""
136+
n_samples = data.shape[0]
137+
for epoch in range(self.epochs):
138+
np.random.shuffle(data)
139+
losses = []
140+
141+
for i in range(0, n_samples, self.batch_size):
142+
batch = data[i:i + self.batch_size]
143+
loss = self.contrastive_divergence(batch)
144+
losses.append(loss)
145+
146+
print(f"Epoch [{epoch + 1}/{self.epochs}] avg loss: {np.mean(losses):.6f}")
147+
148+
149+
class DeepBeliefNetwork:
150+
def __init__(self, input_size, layers, mode='bernoulli', k=5, save_path=None):
151+
"""
152+
Initialize a Deep Belief Network.
153+
154+
Args:
155+
input_size (int): Number of input features.
156+
layers (list): List of hidden layer sizes.
157+
mode (str): Sampling mode ('bernoulli' or 'gaussian').
158+
k (int): Number of sampling steps in generate_input_for_layer.
159+
save_path (str): Path to save trained parameters.
160+
"""
161+
self.input_size = input_size
162+
self.layers = layers
163+
self.k = k
164+
self.mode = mode
165+
self.save_path = save_path
166+
self.layer_params = [{'W': None, 'hb': None, 'vb': None} for _ in layers]
167+
168+
def sigmoid(self, x):
169+
"""
170+
Sigmoid activation function.
171+
172+
Args:
173+
x (np.ndarray): Input array.
174+
175+
Returns:
176+
np.ndarray: Sigmoid output.
177+
"""
178+
return 1.0 / (1.0 + np.exp(-x))
179+
180+
def sample_prob(self, probs):
181+
"""
182+
Sample binary states from probabilities.
183+
184+
Args:
185+
probs (np.ndarray): Probabilities.
186+
187+
Returns:
188+
np.ndarray: Binary samples.
189+
"""
190+
return (np.random.rand(*probs.shape) < probs).astype(float)
191+
192+
def sample_h(self, x, W, hb):
193+
"""
194+
Sample hidden units given visible units.
195+
196+
Args:
197+
x (np.ndarray): Visible units.
198+
W (np.ndarray): Weight matrix.
199+
hb (np.ndarray): Hidden biases.
200+
201+
Returns:
202+
tuple: (hidden probabilities, hidden samples)
203+
"""
204+
probs = self.sigmoid(np.dot(x, W) + hb)
205+
samples = self.sample_prob(probs)
206+
return probs, samples
207+
208+
def sample_v(self, y, W, vb):
209+
"""
210+
Sample visible units given hidden units.
211+
212+
Args:
213+
y (np.ndarray): Hidden units.
214+
W (np.ndarray): Weight matrix.
215+
vb (np.ndarray): Visible biases.
216+
217+
Returns:
218+
tuple: (visible probabilities, visible samples)
219+
"""
220+
probs = self.sigmoid(np.dot(y, W.T) + vb)
221+
samples = self.sample_prob(probs)
222+
return probs, samples
223+
224+
def generate_input_for_layer(self, layer_index, x):
225+
"""
226+
Generate smoothed input for a layer by stacking and averaging samples.
227+
228+
Args:
229+
layer_index (int): Index of the current layer.
230+
x (np.ndarray): Input data.
231+
232+
Returns:
233+
np.ndarray: Smoothed input for the layer.
234+
"""
235+
if layer_index == 0:
236+
return x.copy()
237+
samples = []
238+
for _ in range(self.k):
239+
x_dash = x.copy()
240+
for i in range(layer_index):
241+
_, x_dash = self.sample_h(x_dash, self.layer_params[i]['W'], self.layer_params[i]['hb'])
242+
samples.append(x_dash)
243+
return np.mean(np.stack(samples, axis=0), axis=0)
244+
245+
def train_dbn(self, x):
246+
"""
247+
Train the DBN layer-wise.
248+
249+
Args:
250+
x (np.ndarray): Training data.
251+
"""
252+
for idx, layer_size in enumerate(self.layers):
253+
n_visible = self.input_size if idx == 0 else self.layers[idx - 1]
254+
n_hidden = layer_size
255+
256+
rbm = RBM(n_visible, n_hidden, k=5, epochs=300)
257+
x_input = self.generate_input_for_layer(idx, x)
258+
rbm.train(x_input)
259+
self.layer_params[idx]['W'] = rbm.weights
260+
self.layer_params[idx]['hb'] = rbm.hidden_bias
261+
self.layer_params[idx]['vb'] = rbm.visible_bias
262+
print(f"Finished training layer {idx + 1}/{len(self.layers)}")
263+
264+
def reconstruct(self, x):
265+
"""
266+
Reconstruct input data through forward and backward sampling.
267+
268+
Args:
269+
x (np.ndarray): Input data.
270+
271+
Returns:
272+
tuple: (encoded representation, reconstructed input, reconstruction error)
273+
"""
274+
# Forward pass
275+
h = x.copy()
276+
for i in range(len(self.layer_params)):
277+
_, h = self.sample_h(h, self.layer_params[i]['W'], self.layer_params[i]['hb'])
278+
encoded = h.copy()
279+
280+
# Backward pass
281+
for i in reversed(range(len(self.layer_params))):
282+
_, h = self.sample_v(h, self.layer_params[i]['W'], self.layer_params[i]['vb'])
283+
reconstructed = h
284+
285+
# Compute reconstruction error (Mean Squared Error)
286+
error = np.mean((x - reconstructed) ** 2)
287+
print(f"Reconstruction error: {error:.6f}")
288+
289+
return encoded, reconstructed, error
290+
291+
# Usage example
292+
if __name__ == "__main__":
293+
# Generate synthetic dataset
294+
data = np.random.randint(0, 2, (100, 16)).astype(float)
295+
296+
# Initialize DBN
297+
dbn = DeepBeliefNetwork(input_size=16, layers=[16, 8, 4])
298+
299+
# Train DBN
300+
dbn.train_dbn(data)
301+
302+
# Reconstruct
303+
encoded, reconstructed, error = dbn.reconstruct(data[:5])
304+
print("Encoded shape:", encoded.shape)
305+
print("Reconstructed shape:", reconstructed.shape)
306+
# Visualization of original vs reconstructed samples
307+
features_to_show = 16 # Show only the first 20 features
308+
plt.figure(figsize=(12, 5))
309+
for i in range(5):
310+
plt.subplot(2, 5, i + 1)
311+
plt.title(f"Original {i+1}")
312+
plt.imshow(data[i][:features_to_show].reshape(1, -1), cmap='gray', aspect='auto', interpolation='nearest')
313+
plt.axis('off')
314+
315+
plt.subplot(2, 5, i + 6)
316+
plt.title(f"Reconstructed {i+1}")
317+
plt.imshow(reconstructed[i][:features_to_show].reshape(1, -1), cmap='gray', aspect='auto', interpolation='nearest')
318+
plt.axis('off')
319+
plt.suptitle(f"DBN Reconstruction (First {features_to_show} Features, MSE: {error:.6f})")
320+
plt.tight_layout()
321+
plt.savefig('reconstruction_subset.png')
322+
print("Subset reconstruction plot saved as 'reconstruction_subset.png'")

0 commit comments

Comments
 (0)