Source code for chaotic_neural_networks.networkA

from .utils import *

#------------------------------------------------------------
# Parameters: default values

N_G = 1000
"""
Generator Network: Number of neurons
"""

p_GG = 0.1
"""
Generator Network: **sparseness parameter** of the connection matrix.
Each coefficient thereof is set to \\\(0\\\) with probability \\\(1-p_{GG}\\\).
"""

p_z = 1.
"""
Sparseness parameter of the readout: a random fraction \\\(1-p_z\\\) of the components
 of \\\(\\\mathbf{w}\\\) are held to \\\(0\\\).
"""

g_Gz = 1.
"""
Scaling factor of  the feedback loop:
Increasing the feedback connections result in the network chaotic activity allowing the learning process.
"""

α = 1.
"""
Inverse Learning rate parameter: \\\(P\\\), the estimate of the inverse of the network rates correlation matrix plus a regularization term,
is initialized as $$P(0) = \\\\frac 1 α \\\mathbf{I}$$

So a sensible value of \\\(α\\\)
- depends on the target function
- ought to be chosen such that \\\(α << N\\\)

If 
- \\\(α\\\) is too small ⟹ the learning is so fast it can cause unstability issues.
- \\\(α\\\) is too large ⟹ the learning is so slow it may fail
"""


g_GG = 1.5 # g_GG > 1 ⟹ chaos
"""
Scaling factor of the connection synaptic strength matrix of the generator network.
$$g_{GG} > 1 ⟹ \\\text{chaotic behavior}$$
"""

τ = 10.
"""
Time constant of the units dynamics.
"""

dt = 0.1
"""
Network integration time step.
"""

Δt = 10*dt
"""
Time span between modifications of the readout weights: \\\(Δt ≥ dt\\\)
"""

[docs]class NetworkA: """ Neural Architecture A: .. image:: http://younesse.net/images/Neuromodeling/networkA.png :width: 50% :align: center - A recurrent generator network with firing rates \\\(\\\mathbf{r}\\\) driving a linear readout unit with output \\\(z\\\) through weights \\\(\\\mathbf{r}\\\) that are modified during training. - Feedback to the generator network is provided by the readout unit. """ def __init__(self, N_G=N_G, p_GG=p_GG, g_GG=g_GG, g_Gz=g_Gz, f=triangle, dt=dt, Δt=Δt, α=α, τ=τ, seed=1, nb_outputs=1): self.N_G, self.p_GG, self.g_GG, self.g_Gz = N_G, p_GG, g_GG, g_Gz self.f, self.dt, self.Δt, self.α, self.τ, self.nb_outputs = f, dt, Δt, α, τ, nb_outputs self.seed = seed np.random.seed(seed) std = 1./np.sqrt(p_GG*N_G) self.J_GG = std*sparse.random(N_G, N_G, density=p_GG, random_state=seed, data_rvs=np.random.randn).toarray() self.J_Gz = 2*np.random.rand(N_G, nb_outputs) - 1 self._init_variables() def _init_variables(self): self.nb_train_steps = 0 self.time_elapsed = 0 self.w = np.zeros((self.N_G, self.nb_outputs)) self.x = 0.5 * np.random.randn(self.N_G) self.r = np.tanh(self.x) self.z = 0.5 * np.random.randn(self.nb_outputs) self.P = np.eye(self.N_G)/self.α # Storing the values of w and its time-derivative self.w_list = [] self.w_dot_list = [] self.z_list = {} self.z_list['train'], self.z_list['test'] = [], []
[docs] def error(self, train_test='train'): """Compute the average training/testing error. Parameters ---------- train_test : {'PCA', 'MDA'}, optional Choice of the error to compute: train or test. Returns ------- (len(self.z_list),) array Train of test error, depending on `train_test` """ z_time, z_val = map(np.array, zip(*self.z_list[train_test])) f_time = self.f(z_time) if len(z_val.shape) > len(f_time.shape)==1: f_time = f_time.reshape([-1, 1]) return np.mean(np.abs(z_val-f_time), axis=0)
[docs] def step(self, train_test='train', store=True): """Execute one time step of length ``dt`` of the network dynamics. Parameters ---------- train_test : {'PCA', 'MDA'}, optional Learning phase (when \\\(P\\\) and the readout unit are updated) or test phase (no such update) Examples -------- >>> from chaotic_neural_networks import networkA; net = networkA.NetworkA() >>> for _ in np.arange(0, 1200, net.dt): ... net.step() >>> net.error() 0.015584795078446064 """ dt, Δt, τ, g_GG, g_Gz = self.dt, self.Δt, self.τ, self.g_GG, self.g_Gz self.x = (1 - dt/τ)*self.x + g_GG*self.J_GG.dot(self.r)*dt/τ + g_Gz*self.J_Gz.dot(self.z)*dt/τ self.r = np.tanh(self.x) self.z = self.w.T.dot(self.r).flatten() self.time_elapsed += dt #current_time = int(train_test!='train')*self.time_elapsed['train']+self.time_elapsed[train_test] Δw = np.zeros(self.w.shape) if train_test == 'train': self.nb_train_steps+=1 if self.nb_train_steps%(Δt//dt) == 0: # P update Pr = self.P.dot(self.r) self.P -= np.outer(Pr, self.r).dot(self.P)/(1+self.r.dot(Pr)) # prediction error update self.e_minus = self.z - self.f(self.time_elapsed) # output weights update Δw = np.outer(self.P.dot(self.r), self.e_minus) self.w -= Δw if store: # Store w and \dot{w}'s norm self.w_list.append((self.time_elapsed, np.linalg.norm(self.w, axis=0))) self.w_dot_list.append((self.time_elapsed, np.linalg.norm(Δw/Δt, axis=0))) # Store z self.z_list[train_test].append((self.time_elapsed, self.z))
def FORCE_figure(self, ts, fs, zs, xs, ws, neuron_indexes=None, already_split=True): lw_f, lw_z = 3.5, 1.5 nb_split = 3 # 3 phases: pre-training, training, testing (post-training) if neuron_indexes is None: neuron_indexes = np.arange(len(xs)) fig = plt.figure(figsize=(17, 13)) gs = GridSpec(3, 3) if not already_split: fs_list, zs_list, xs_list, ws_list, ts_list = [np.array_split(y, nb_split, axis=len(y.shape)-1) for i,y in enumerate([fs, zs, xs, ws, ts])] else: fs_list, zs_list, xs_list, ws_list, ts_list = fs, zs, xs, ws, ts f_lim, x_lim, w_lim = [(min([i.min() for i in L]), max([i.max() for i in L])) for L in [fs, xs, ws]] Δ = 1.2*(x_lim[1] - x_lim[0]) for i, (fs_sub, zs_sub, xs_sub, ws_sub, ts_sub, title) in enumerate(zip(fs_list, zs_list, xs_list, ws_list, ts_list, ['Spontaneous Activity', 'Learning', 'Testing'])): if len(fs_sub.shape)==1: fs_sub = [fs_sub] if len(zs_sub.shape)==1: zs_sub = [zs_sub] if len(ws_sub.shape)==1: ws_sub = [ws_sub] # Plotting f and z ax_fz = fig.add_subplot(gs[0,i]) ax_fz.set_title(title).set_fontsize('x-large') highest_color = .9 if len(fs_sub)>1 else .6 ax_fz.set_prop_cycle(plt.cycler('color', plt.cm.Greens(np.linspace(0, highest_color, len(fs_sub)+1)[1:]))) for j, f in enumerate(fs_sub): ax_fz.plot(ts_sub, f, lw=lw_f, label='$f_{'+str(j+1)+'}$' if len(fs_sub)>1 else '$f$') ax_fz.set_prop_cycle(plt.cycler('color', plt.cm.Reds(np.linspace(0, highest_color, len(zs_sub)+1)[1:]))) for j, z in enumerate(zs_sub): ax_fz.plot(ts_sub, z, lw=lw_z, label='$z_{'+str(j+1)+'}$' if len(zs_sub)>1 else '$z$') ax_fz.legend(loc='best', fancybox=True, framealpha=0.7) ax_fz.set_ylim((f_lim[0]-.5, f_lim[1]+.5)) pos = [['left'], [], ['right']] draw_axis_lines(ax_fz, pos[i]) # Plotting the firing rates of sample neurons ax_x = fig.add_subplot(gs[1,i]) draw_axis_lines(ax_x, []) add_collection_curves(ax_x, ts_sub, xs_sub.T, y_lim=(x_lim[0]-.1, x_lim[1]+.1), Δ=Δ, labels=['Neuron ${}$'.format(i) for i in neuron_indexes] if i==0 else None) # Plotting the time-derivative of the readout weight vector ax_w = fig.add_subplot(gs[2,i]) highest_color = .9 if len(ws_sub)>1 else .6 ax_w.set_prop_cycle(plt.cycler('color', plt.cm.Oranges(np.linspace(0, highest_color, len(ws_sub)+1)[1:]))) for j, w in enumerate(ws_sub): ax_w.plot(ts_sub, w, label='$|\dot{w_{'+str(j+1)+'}}|$' if len(ws_sub)>1 else '$|\dot{w}|$') ax_w.legend(loc='best', fancybox=True, framealpha=0.7) ax_w.set_ylim(*w_lim) draw_axis_lines(ax_w,['bottom']) ax_w.set_xlabel('Time (ms)') fig.suptitle("FORCE Training Sequence").set_fontsize('xx-large') self.fig = fig return fig
[docs] def FORCE_sequence(self, t_max, number_neurons=5): """ Returns a matplotlib figure of a full FORCE training sequence, showing the evolution of: - network ouput(s) - ``number_neurons`` neurons membrane potential - and the time-derivative of the readout vector \\\(\\\dot{\\\\textbf{w}}\\\) before training (spontaneous activity), throughout training, and after training (test phase): each one of these phases lasts ``t_max/3``. See ``training_sequence_plots.py`` in the github repository for further examples. Examples -------- >> network = networkA.NetworkA(f=utils.periodic); network.FORCE_sequence(600) Pre-training / Spontaneous activity... Training... > **Average Train Error:** [ 0.02805716] Testing... > **Average Test Error:** [ 2.50709125] """ assert t_max%3==0 # Reinitialization of the network self._init_variables() ts_list = np.array_split(np.arange(0, t_max, self.dt), 3) ts_pretrain, ts_train, ts_test = ts_list fs_list, zs_list, xs_list, ws_list = [], [], [], [] # Random neuron indices: the neurons we will plot mask_random = np.arange(self.N_G) np.random.shuffle(mask_random) mask_random = mask_random[:number_neurons] #------------------------------------------------------------ # Pre-training / Spontaneous activity print('Pre-training / Spontaneous activity...') xs_sublist, zs_sublist = [], [] for _ in ts_pretrain: self.step(train_test='test', store=False) xs_sublist.append(self.r[mask_random]) zs_sublist.append(self.z) xs_list.append(np.array(xs_sublist)) zs_list.append(np.array(list(zip(*zs_sublist)))) ws_list.append(np.zeros((self.nb_outputs, len(ts_pretrain)))) f_time = self.f(ts_pretrain) if len(f_time.shape)==1: f_time = f_time.reshape([-1, 1]) fs_list.append(np.array(list(zip(*f_time)))) #------------------------------------------------------------ # TRAINING Phase print('Training...') xs_sublist, zs_sublist = [], [] for _ in ts_train: self.step() xs_sublist.append(self.r[mask_random]) xs_list.append(np.array(xs_sublist)) _, zs_sublist = zip(*self.z_list['train']) _, ws_sublist = zip(*self.w_dot_list) zs_list.append(np.array(list(zip(*zs_sublist)))) ws_list.append(np.array(list(zip(*ws_sublist)))) f_time = self.f(ts_train) if len(f_time.shape)==1: f_time = f_time.reshape([-1, 1]) fs_list.append(np.array(list(zip(*f_time)))) print('> **Average Train Error:** {}'.format(self.error())) #------------------------------------------------------------ # TESTING phase print('Testing...') xs_sublist, zs_sublist = [], [] for _ in ts_train: self.step(train_test='test') xs_sublist.append(self.r[mask_random]) xs_list.append(np.array(xs_sublist)) _, zs_sublist = zip(*self.z_list['test']) zs_list.append(np.array(list(zip(*zs_sublist)))) ws_list.append(np.array(list(zip(*[ws_sublist[-1]]*len(ts_test))))) f_time = self.f(ts_test) if len(f_time.shape)==1: f_time = f_time.reshape([-1, 1]) fs_list.append(np.array(list(zip(*f_time)))) print('> **Average Test Error:** {}'.format(self.error(train_test='test'))) self.FORCE_figure(ts_list, fs_list, zs_list, xs_list, ws_list, neuron_indexes=mask_random).show()
def _principal_components_figure(self, ts, fs_list, zs_list, xs_list, eigvals): lw_f, lw_z = 3.5, 1.5 fig = plt.figure(figsize=(17, 13)) gs = GridSpec(3, 1) f_lim, x_lim = [(i.min(), i.max()) for i in [fs_list, xs_list]] Δ = 1.2*(x_lim[1] - x_lim[0]) if len(fs_list.shape)==1: fs_list = [fs_list] if len(zs_list.shape)==1: zs_list = [zs_list] # Plotting f and z_eig (projection on leading components) ax_fz = fig.add_subplot(gs[0]) ax_fz.set_title('Projection onto the {} leading Principal Components (PC)'.format(len(xs_list))).set_fontsize('x-large') highest_color = .9 if len(fs_list)>1 else .6 ax_fz.set_prop_cycle(plt.cycler('color', plt.cm.Greens(np.linspace(0, highest_color, len(fs_list)+1)[1:]))) for j, f in enumerate(fs_list): ax_fz.plot(ts, f, lw=lw_f, label='$f_{'+str(j+1)+'}$' if len(fs_list)>1 else '$f$') ax_fz.set_prop_cycle(plt.cycler('color', plt.cm.Reds(np.linspace(0, highest_color, len(zs_list)+1)[1:]))) for j, z in enumerate(zs_list): ax_fz.plot(ts, z, lw=lw_z, label='Approximation for $f_{'+str(j+1)+'}$' if len(zs_list)>1 else 'Approximation') ax_fz.legend(loc='best', fancybox=True, framealpha=0.7) ax_fz.set_ylim((f_lim[0]-.5, f_lim[1]+.5)) draw_axis_lines(ax_fz, ['left']) # Plotting the firing rates of sample neurons ax_x = fig.add_subplot(gs[1]) draw_axis_lines(ax_x, ['bottom']) ax_x.set_xlabel('Time (ms)') ax_x.xaxis.set_label_coords(1.05, -0.025) add_collection_curves(ax_x, ts, xs_list, y_lim=(x_lim[0]-.1, x_lim[1]+.1), Δ=Δ, labels=['PC ${}$'.format(len(xs_list)-i) for i in range(len(xs_list))]) # Plotting the time-derivative of the readout weight vector ax_w = fig.add_subplot(gs[2]) ax_w.semilogy(eigvals, color='blue', label='eigenvalues (logscale)') ax_w.legend(loc='best', fancybox=True, framealpha=0.7) draw_axis_lines(ax_w,['left', 'bottom']) ax_w.set_xlabel('Index') fig.suptitle("Principal Component Analysis of Network Activity").set_fontsize('xx-large') self.fig_eig = fig return fig def principal_components(self, t_max, nb_eig=8): #------------------------------------------------------------ # TESTING phase print('Testing...') ts = np.arange(self.time_elapsed, self.time_elapsed+t_max, self.dt) xs_list = [] for _ in ts: self.step(train_test='test') xs_list.append(self.r) xs_list = np.array(xs_list) self.eig_vec, self.proj, self.eig_val = PCA(xs_list, nb_eig=nb_eig, return_matrix=True, return_eigenvalues=True) f_time = self.f(ts) if len(f_time.shape)==1: f_time = f_time.reshape([-1, 1]) fs_list = np.array(list(zip(*f_time))) # Projection over the leading principal components proj_w = self.w.T.dot(self.eig_vec) z_eig = self.proj.dot(proj_w.T) if len(z_eig.shape)==1: z_eig = z_eig.reshape([-1, 1]) z_eig_list = np.array(list(zip(*z_eig))) self.eig_val = self.eig_val[::-1] self._principal_components_figure(ts, fs_list, z_eig_list, self.proj.T, self.eig_val).show()