Skip to content

Analysis API Reference

nelpy.analysis

This is the nelpy analysis sub-package.

nelpy.analysis provides several commonly used analyses.

column_cycle_array(posterior, amt=None)

Cycle each column of the posterior matrix by a random or specified amount.

Parameters:

Name Type Description Default
posterior ndarray

Posterior probability matrix (position x time).

required
amt array - like or None

Amount to cycle each column. If None, random cycling is used.

None

Returns:

Name Type Description
out ndarray

Cycled posterior matrix.

Source code in nelpy/analysis/replay.py
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
def column_cycle_array(posterior, amt=None):
    """
    Cycle each column of the posterior matrix by a random or specified amount.

    Parameters
    ----------
    posterior : np.ndarray
        Posterior probability matrix (position x time).
    amt : array-like or None, optional
        Amount to cycle each column. If None, random cycling is used.

    Returns
    -------
    out : np.ndarray
        Cycled posterior matrix.
    """
    out = copy.deepcopy(posterior)
    rows, cols = posterior.shape

    if amt is None:
        for col in range(cols):
            if np.isnan(np.sum(posterior[:, col])):
                continue
            else:
                out[:, col] = np.roll(posterior[:, col], np.random.randint(1, rows))
    else:
        if len(amt) == cols:
            for col in range(cols):
                if np.isnan(np.sum(posterior[:, col])):
                    continue
                else:
                    out[:, col] = np.roll(posterior[:, col], int(amt[col]))
        else:
            raise TypeError("amt does not seem to be the correct shape!")
    return out

fmpt(P)

Calculates the matrix of first mean passage times for an ergodic transition probability matrix.

Parameters:

Name Type Description Default
P

an ergodic Markov transition probability matrix

required

Returns:

Name Type Description
M array(kxk)

elements are the expected value for the number of intervals required for a chain starting in state i to first enter state j If i=j then this is the recurrence time.

Examples:

>>> import numpy as np
>>> p = np.array([[0.5, 0.25, 0.25], [0.5, 0, 0.5], [0.25, 0.25, 0.5]])
>>> fm = fmpt(p)
>>> fm
array([[ 2.5       ,  4.        ,  3.33333333],
       [ 2.66666667,  5.        ,  2.66666667],
       [ 3.33333333,  4.        ,  2.5       ]])
Thus, if it is raining today in Oz we can expect a nice day to come
along in another 4 days, on average, and snow to hit in 3.33 days. We can
expect another rainy day in 2.5 days. If it is nice today in Oz, we would
experience a change in the weather (either rain or snow) in 2.67 days from
today. (That wicked witch can only die once so I reckon that is the
ultimate absorbing state).

Notes ----- Uses formulation (and examples on p. 218) in Kemeny and Snell (1976) [1]_ References


.. [1] Kemeny, John, G. and J. Laurie Snell (1976) Finite Markov Chains. Springer-Verlag. Berlin

Source code in nelpy/analysis/ergodic.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def fmpt(P):
    """
    Calculates the matrix of first mean passage times for an
    ergodic transition probability matrix.
    Parameters
    ----------
    P    : array (kxk)
           an ergodic Markov transition probability matrix
    Returns
    -------
    M    : array (kxk)
           elements are the expected value for the number of intervals
           required for  a chain starting in state i to first enter state j
           If i=j then this is the recurrence time.

    Examples
    --------
    >>> import numpy as np
    >>> p = np.array([[0.5, 0.25, 0.25], [0.5, 0, 0.5], [0.25, 0.25, 0.5]])
    >>> fm = fmpt(p)
    >>> fm
    array([[ 2.5       ,  4.        ,  3.33333333],
           [ 2.66666667,  5.        ,  2.66666667],
           [ 3.33333333,  4.        ,  2.5       ]])
    Thus, if it is raining today in Oz we can expect a nice day to come
    along in another 4 days, on average, and snow to hit in 3.33 days. We can
    expect another rainy day in 2.5 days. If it is nice today in Oz, we would
    experience a change in the weather (either rain or snow) in 2.67 days from
    today. (That wicked witch can only die once so I reckon that is the
    ultimate absorbing state).

    Notes -----
    Uses formulation (and examples on p. 218) in Kemeny and Snell (1976) [1]_
    References
    ----------

    .. [1] Kemeny, John, G. and J. Laurie Snell (1976) Finite Markov
       Chains. Springer-Verlag. Berlin
    """
    P = np.asarray(P)
    A = np.zeros_like(P)
    ss = steady_state(P)
    k = ss.shape[0]
    for i in range(k):
        A[:, i] = ss.flatten()
    A = A.T
    identity_matrix = np.identity(k)
    Z = la.inv(identity_matrix - P + A)
    E = np.ones_like(Z)
    D = np.diag(1.0 / np.diag(A))
    Zdg = np.diag(np.diag(Z))
    M = (identity_matrix - Z + np.multiply(E, Zdg)) @ D
    return M

get_significant_events(scores, shuffled_scores, q=95)

Return the significant events based on percentiles.

Parameters:

Name Type Description Default
scores ndarray

Scores for each event.

required
shuffled_scores ndarray

Shuffled scores for each event and shuffle.

required
q float

Percentile to compute (default is 95).

95

Returns:

Name Type Description
sig_event_idx ndarray

Indices of significant events.

pvalues ndarray

Monte Carlo p-values for each event.

Source code in nelpy/analysis/replay.py
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
def get_significant_events(scores, shuffled_scores, q=95):
    """
    Return the significant events based on percentiles.

    Parameters
    ----------
    scores : np.ndarray
        Scores for each event.
    shuffled_scores : np.ndarray
        Shuffled scores for each event and shuffle.
    q : float, optional
        Percentile to compute (default is 95).

    Returns
    -------
    sig_event_idx : np.ndarray
        Indices of significant events.
    pvalues : np.ndarray
        Monte Carlo p-values for each event.
    """

    n, _ = shuffled_scores.shape
    r = np.sum(shuffled_scores >= scores, axis=0)
    pvalues = (r + 1) / (n + 1)

    sig_event_idx = np.argwhere(
        scores > np.percentile(shuffled_scores, axis=0, q=q)
    ).squeeze()

    return np.atleast_1d(sig_event_idx), np.atleast_1d(pvalues)

linregress_array(posterior)

Perform linear regression on the posterior matrix, and return the slope, intercept, and R^2 value.

Parameters:

Name Type Description Default
posterior ndarray

Posterior probability matrix (position x time).

required

Returns:

Name Type Description
slope float

Slope of the best-fit line.

intercept float

Intercept of the best-fit line.

r2 float

R^2 value of the fit.

Source code in nelpy/analysis/replay.py
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
def linregress_array(posterior):
    """
    Perform linear regression on the posterior matrix, and return the slope, intercept, and R^2 value.

    Parameters
    ----------
    posterior : np.ndarray
        Posterior probability matrix (position x time).

    Returns
    -------
    slope : float
        Slope of the best-fit line.
    intercept : float
        Intercept of the best-fit line.
    r2 : float
        R^2 value of the fit.
    """

    mode_pth = get_mode_pth_from_array(posterior)

    y = mode_pth
    x = np.arange(len(y))
    x = x[~np.isnan(y)]
    y = y[~np.isnan(y)]

    if len(y) > 0:
        slope, intercept, rvalue, pvalue, stderr = stats.linregress(x, y)
        return slope, intercept, rvalue**2
    else:
        return np.nan, np.nan, np.nan

linregress_bst(bst, tuningcurve)

Perform linear regression on all the events in bst, and return the slopes, intercepts, and R^2 values.

Parameters:

Name Type Description Default
bst BinnedSpikeTrainArray

Binned spike train array containing all candidate events.

required
tuningcurve TuningCurve1D

Tuning curve for decoding.

required

Returns:

Name Type Description
slopes ndarray

Slopes for each event.

intercepts ndarray

Intercepts for each event.

r2values ndarray

R^2 values for each event.

Source code in nelpy/analysis/replay.py
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
def linregress_bst(bst, tuningcurve):
    """
    Perform linear regression on all the events in bst, and return the slopes, intercepts, and R^2 values.

    Parameters
    ----------
    bst : BinnedSpikeTrainArray
        Binned spike train array containing all candidate events.
    tuningcurve : TuningCurve1D
        Tuning curve for decoding.

    Returns
    -------
    slopes : np.ndarray
        Slopes for each event.
    intercepts : np.ndarray
        Intercepts for each event.
    r2values : np.ndarray
        R^2 values for each event.
    """

    posterior, bdries, mode_pth, mean_pth = decode(bst=bst, ratemap=tuningcurve)

    slopes = np.zeros(bst.n_epochs)
    intercepts = np.zeros(bst.n_epochs)
    r2values = np.zeros(bst.n_epochs)
    for idx in range(bst.n_epochs):
        y = mode_pth[bdries[idx] : bdries[idx + 1]]
        x = np.arange(bdries[idx], bdries[idx + 1], step=1)
        x = x[~np.isnan(y)]
        y = y[~np.isnan(y)]

        if len(y) > 0:
            slope, intercept, rvalue, pvalue, stderr = stats.linregress(x, y)
            slopes[idx] = slope
            intercepts[idx] = intercept
            r2values[idx] = rvalue**2
        else:
            slopes[idx] = np.nan
            intercepts[idx] = np.nan
            r2values[idx] = np.nan  #
    #     if bst.n_epochs == 1:
    #         return np.asscalar(slopes), np.asscalar(intercepts), np.asscalar(r2values)
    return slopes, intercepts, r2values

linregress_ting(bst, tuningcurve, n_shuffles=250)

Perform linear regression on all the events in bst, and return the R^2 values.

Parameters:

Name Type Description Default
bst BinnedSpikeTrainArray

Binned spike train array containing all candidate events.

required
tuningcurve TuningCurve1D

Tuning curve for decoding.

required
n_shuffles int

Number of shuffles for the null distribution (default is 250).

250

Returns:

Name Type Description
r2values ndarray

R^2 values for each event.

r2values_shuffled ndarray

Shuffled R^2 values for each event and shuffle.

Source code in nelpy/analysis/replay.py
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
def linregress_ting(bst, tuningcurve, n_shuffles=250):
    """
    Perform linear regression on all the events in bst, and return the R^2 values.

    Parameters
    ----------
    bst : BinnedSpikeTrainArray
        Binned spike train array containing all candidate events.
    tuningcurve : TuningCurve1D
        Tuning curve for decoding.
    n_shuffles : int, optional
        Number of shuffles for the null distribution (default is 250).

    Returns
    -------
    r2values : np.ndarray
        R^2 values for each event.
    r2values_shuffled : np.ndarray
        Shuffled R^2 values for each event and shuffle.
    """

    if float(n_shuffles).is_integer:
        n_shuffles = int(n_shuffles)
    else:
        raise ValueError("n_shuffles must be an integer!")

    posterior, bdries, mode_pth, mean_pth = decode(bst=bst, ratemap=tuningcurve)

    #     bdries = np.insert(np.cumsum(bst.lengths), 0, 0)
    r2values = np.zeros(bst.n_epochs)
    r2values_shuffled = np.zeros((n_shuffles, bst.n_epochs))
    for idx in range(bst.n_epochs):
        y = mode_pth[bdries[idx] : bdries[idx + 1]]
        x = np.arange(bdries[idx], bdries[idx + 1], step=1)
        x = x[~np.isnan(y)]
        y = y[~np.isnan(y)]

        if len(y) > 0:
            slope, intercept, rvalue, pvalue, stderr = stats.linregress(x, y)
            r2values[idx] = rvalue**2
        else:
            r2values[idx] = np.nan  #
        for ss in range(n_shuffles):
            if len(y) > 0:
                slope, intercept, rvalue, pvalue, stderr = stats.linregress(
                    np.random.permutation(x), y
                )
                r2values_shuffled[ss, idx] = rvalue**2
            else:
                r2values_shuffled[ss, idx] = (
                    np.nan
                )  # event contained NO decoded activity... unlikely or even impossible with current code

    #     sig_idx = np.argwhere(r2values[0,:] > np.percentile(r2values, q=q, axis=0))
    #     np.argwhere(((R2[1:,:] >= R2[0,:]).sum(axis=0))/(R2.shape[0]-1)<0.05) # equivalent to above
    if n_shuffles > 0:
        return r2values, r2values_shuffled
    return r2values

pooled_time_swap_bst(bst)

Time swap on BinnedSpikeTrainArray, swapping within entire bst.

Parameters:

Name Type Description Default
bst BinnedSpikeTrainArray

Binned spike train array to swap.

required

Returns:

Name Type Description
out BinnedSpikeTrainArray

Time-swapped spike train array.

Source code in nelpy/analysis/replay.py
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
def pooled_time_swap_bst(bst):
    """
    Time swap on BinnedSpikeTrainArray, swapping within entire bst.

    Parameters
    ----------
    bst : BinnedSpikeTrainArray
        Binned spike train array to swap.

    Returns
    -------
    out : BinnedSpikeTrainArray
        Time-swapped spike train array.
    """
    out = copy.deepcopy(bst)  # should this be deep? YES! Oh my goodness, yes!
    shuffled = np.random.permutation(bst.n_bins)
    out._data = out._data[:, shuffled]
    return out

score_hmm_logprob_cumulative(bst, hmm, normalize=False)

Score events in a BinnedSpikeTrainArray by computing the cumulative log probability under the model.

Parameters:

Name Type Description Default
bst BinnedSpikeTrainArray

Binned spike train array containing all candidate events.

required
hmm PoissonHMM

Trained hidden Markov model.

required
normalize bool

If True, log probabilities will be normalized by their sequence lengths.

False

Returns:

Name Type Description
logprob ndarray

Cumulative log probabilities for each event in bst.

Source code in nelpy/analysis/replay.py
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
def score_hmm_logprob_cumulative(bst, hmm, normalize=False):
    """
    Score events in a BinnedSpikeTrainArray by computing the cumulative log probability under the model.

    Parameters
    ----------
    bst : BinnedSpikeTrainArray
        Binned spike train array containing all candidate events.
    hmm : PoissonHMM
        Trained hidden Markov model.
    normalize : bool, optional
        If True, log probabilities will be normalized by their sequence lengths.

    Returns
    -------
    logprob : np.ndarray
        Cumulative log probabilities for each event in bst.
    """

    logprob = np.atleast_1d(hmm._cum_score_per_bin(bst))
    if normalize:
        cumlengths = []
        for evt in bst.lengths:
            cumlengths.extend(np.arange(1, evt + 1).tolist())
        cumlengths = np.array(cumlengths)
        logprob = np.atleast_1d(logprob) / cumlengths

    return logprob

score_hmm_time_resolved(bst, hmm, n_shuffles=250, normalize=False)

Score sequences using a hidden Markov model and a model where the transition probability matrix has been shuffled (time-resolved).

Parameters:

Name Type Description Default
bst BinnedSpikeTrainArray

Binned spike train array containing all candidate events.

required
hmm PoissonHMM

Trained hidden Markov model.

required
n_shuffles int

Number of shuffles for the null distribution. Default is 250.

250
normalize bool

If True, normalize the scores by event lengths.

False

Returns:

Name Type Description
scores ndarray

Log probabilities for each event.

shuffled ndarray

Shuffled log probabilities for each event and shuffle.

Source code in nelpy/analysis/replay.py
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
def score_hmm_time_resolved(bst, hmm, n_shuffles=250, normalize=False):
    """
    Score sequences using a hidden Markov model and a model where the transition probability matrix has been shuffled (time-resolved).

    Parameters
    ----------
    bst : BinnedSpikeTrainArray
        Binned spike train array containing all candidate events.
    hmm : PoissonHMM
        Trained hidden Markov model.
    n_shuffles : int, optional
        Number of shuffles for the null distribution. Default is 250.
    normalize : bool, optional
        If True, normalize the scores by event lengths.

    Returns
    -------
    scores : np.ndarray
        Log probabilities for each event.
    shuffled : np.ndarray
        Shuffled log probabilities for each event and shuffle.
    """

    if float(n_shuffles).is_integer:
        n_shuffles = int(n_shuffles)
    else:
        raise ValueError("n_shuffles must be an integer!")

    hmm_shuffled = copy.deepcopy(hmm)
    Lbraw = score_hmm_logprob_cumulative(bst=bst, hmm=hmm, normalize=normalize)

    # per event, compute L(:b|raw) - L(:b-1|raw)
    Lb = copy.deepcopy(Lbraw)

    cumLengths = np.cumsum(bst.lengths)
    cumLengths = np.insert(cumLengths, 0, 0)

    for ii in range(bst.n_epochs):
        LE = cumLengths[ii]
        RE = cumLengths[ii + 1]
        Lb[LE + 1 : RE] -= Lbraw[LE : RE - 1]

    n_bins = bst.n_bins
    shuffled = np.zeros((n_shuffles, n_bins))
    for ii in range(n_shuffles):
        hmm_shuffled.transmat_ = shuffle_transmat(hmm_shuffled.transmat_)
        Lbtmat = score_hmm_logprob_cumulative(
            bst=bst, hmm=hmm_shuffled, normalize=normalize
        )

        # per event, compute L(:b|tmat) - L(:b-1|raw)
        NL = copy.deepcopy(Lbtmat)
        for jj in range(bst.n_epochs):
            LE = cumLengths[jj]
            RE = cumLengths[jj + 1]
            NL[LE + 1 : RE] -= Lbraw[LE : RE - 1]

        shuffled[ii, :] = NL

    scores = Lb

    return scores, shuffled

steady_state(P)

Calculates the steady state probability vector for a regular Markov transition matrix P

Parameters:

Name Type Description Default
P
   an ergodic Markov transition probability matrix
required

Returns:

Name Type Description
implicit array(kx1)

steady state distribution

Examples:

Taken from Kemeny and Snell. [1]_ Land of Oz example where the states are Rain, Nice and Snow - so there is 25 percent chance that if it rained in Oz today, it will snow tomorrow, while if it snowed today in Oz there is a 50 percent chance of snow again tomorrow and a 25 percent chance of a nice day (nice, like when the witch with the monkeys is melting).

>>> import numpy as np
>>> p = np.array([[0.5, 0.25, 0.25], [0.5, 0, 0.5], [0.25, 0.25, 0.5]])
>>> steady_state(p)
array([[ 0.4],
       [ 0.2],
       [ 0.4]])
Thus, the long run distribution for Oz is to have 40 percent of the
days classified as Rain, 20 percent as Nice, and 40 percent as Snow
(states are mutually exclusive).
Source code in nelpy/analysis/ergodic.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def steady_state(P):
    """
    Calculates the steady state probability vector for a regular Markov
    transition matrix P
    Parameters
    ----------
    P        : array (kxk)
               an ergodic Markov transition probability matrix
    Returns
    -------
    implicit : array (kx1)
               steady state distribution
    Examples
    --------
    Taken from Kemeny and Snell. [1]_ Land of Oz example where the states are
    Rain, Nice and Snow - so there is 25 percent chance that if it
    rained in Oz today, it will snow tomorrow, while if it snowed today in
    Oz there is a 50 percent chance of snow again tomorrow and a 25
    percent chance of a nice day (nice, like when the witch with the monkeys
    is melting).
    >>> import numpy as np
    >>> p = np.array([[0.5, 0.25, 0.25], [0.5, 0, 0.5], [0.25, 0.25, 0.5]])
    >>> steady_state(p)
    array([[ 0.4],
           [ 0.2],
           [ 0.4]])
    Thus, the long run distribution for Oz is to have 40 percent of the
    days classified as Rain, 20 percent as Nice, and 40 percent as Snow
    (states are mutually exclusive).
    """

    v, d = la.eig(np.transpose(P))

    # for a regular P maximum eigenvalue will be 1
    mv = max(v.real)  # Use real part for comparison
    # find its position
    i = v.real.tolist().index(mv)

    # normalize eigenvector corresponding to the eigenvalue 1
    # Take real part to avoid complex warning
    eigenvector = d[:, i].real
    return eigenvector / np.sum(eigenvector)

time_swap_array(posterior)

Time swap.

Note: it is often possible to simply shuffle the time bins, and not the actual data, for computational efficiency. Still, this function works as expected.

Parameters:

Name Type Description Default
posterior ndarray

Posterior probability matrix (position x time).

required

Returns:

Name Type Description
out ndarray

Time-swapped posterior matrix.

Source code in nelpy/analysis/replay.py
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
def time_swap_array(posterior):
    """
    Time swap.

    Note: it is often possible to simply shuffle the time bins, and not the actual data, for computational
    efficiency. Still, this function works as expected.

    Parameters
    ----------
    posterior : np.ndarray
        Posterior probability matrix (position x time).

    Returns
    -------
    out : np.ndarray
        Time-swapped posterior matrix.
    """
    out = copy.deepcopy(posterior)
    rows, cols = posterior.shape

    colidx = np.arange(cols)
    shuffle_cols = np.random.permutation(colidx)
    out = out[:, shuffle_cols]

    return out

trajectory_score_array(posterior, slope=None, intercept=None, w=None, weights=None, normalize=False)

Compute the trajectory score for a given posterior matrix and line parameters.

Parameters:

Name Type Description Default
posterior ndarray

Posterior probability matrix (position x time).

required
slope float

Slope of the line. If None, estimated from the data.

None
intercept float

Intercept of the line. If None, estimated from the data.

None
w int

Half band width for calculating the trajectory score. Default is 0.

None
weights array - like

Weights for the band around the line (not yet implemented).

None
normalize bool

If True, normalize the score by the number of non-NaN bins.

False

Returns:

Name Type Description
score float

Trajectory score for the event.

Source code in nelpy/analysis/replay.py
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
def trajectory_score_array(
    posterior, slope=None, intercept=None, w=None, weights=None, normalize=False
):
    """
    Compute the trajectory score for a given posterior matrix and line parameters.

    Parameters
    ----------
    posterior : np.ndarray
        Posterior probability matrix (position x time).
    slope : float, optional
        Slope of the line. If None, estimated from the data.
    intercept : float, optional
        Intercept of the line. If None, estimated from the data.
    w : int, optional
        Half band width for calculating the trajectory score. Default is 0.
    weights : array-like, optional
        Weights for the band around the line (not yet implemented).
    normalize : bool, optional
        If True, normalize the score by the number of non-NaN bins.

    Returns
    -------
    score : float
        Trajectory score for the event.
    """

    rows, cols = posterior.shape

    if w is None:
        w = 0
    if not float(w).is_integer:
        raise ValueError("w has to be an integer!")
    if slope is None or intercept is None:
        slope, intercept, _ = linregress_array(posterior=posterior)

    x = np.arange(cols)
    line_y = np.round((slope * x + intercept))  # in position bin #s

    # idea: cycle each column so that the top w rows are the band surrounding the regression line

    if np.isnan(slope):  # this will happen if we have 0 or only 1 decoded bins
        return np.nan
    else:
        temp = column_cycle_array(posterior, -line_y + w)

    if normalize:
        num_non_nan_bins = round(np.nansum(posterior))
    else:
        num_non_nan_bins = 1

    return np.nansum(temp[: 2 * w + 1, :]) / num_non_nan_bins

trajectory_score_bst(bst, tuningcurve, w=None, n_shuffles=250, weights=None, normalize=False)

Compute the trajectory scores from Davidson et al. for each event in the BinnedSpikeTrainArray.

Parameters:

Name Type Description Default
bst BinnedSpikeTrainArray

Binned spike train array containing all candidate events.

required
tuningcurve TuningCurve1D

Tuning curve to decode events in bst.

required
w int

Half band width for calculating the trajectory score. Default is 0.

None
n_shuffles int

Number of shuffles for the null distribution. Default is 250.

250
weights array - like

Weights for the band around the line (not yet implemented).

None
normalize bool

If True, normalize the score by the number of non-NaN bins.

False

Returns:

Name Type Description
scores ndarray

Trajectory scores for each event.

scores_time_swap ndarray

Shuffled scores using time swap.

scores_col_cycle ndarray

Shuffled scores using column cycle.

Source code in nelpy/analysis/replay.py
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
def trajectory_score_bst(
    bst, tuningcurve, w=None, n_shuffles=250, weights=None, normalize=False
):
    """
    Compute the trajectory scores from Davidson et al. for each event in the BinnedSpikeTrainArray.

    Parameters
    ----------
    bst : BinnedSpikeTrainArray
        Binned spike train array containing all candidate events.
    tuningcurve : TuningCurve1D
        Tuning curve to decode events in bst.
    w : int, optional
        Half band width for calculating the trajectory score. Default is 0.
    n_shuffles : int, optional
        Number of shuffles for the null distribution. Default is 250.
    weights : array-like, optional
        Weights for the band around the line (not yet implemented).
    normalize : bool, optional
        If True, normalize the score by the number of non-NaN bins.

    Returns
    -------
    scores : np.ndarray
        Trajectory scores for each event.
    scores_time_swap : np.ndarray
        Shuffled scores using time swap.
    scores_col_cycle : np.ndarray
        Shuffled scores using column cycle.
    """

    if w is None:
        w = 0
    if not float(w).is_integer:
        raise ValueError("w has to be an integer!")

    if float(n_shuffles).is_integer:
        n_shuffles = int(n_shuffles)
    else:
        raise ValueError("n_shuffles must be an integer!")

    posterior, bdries, mode_pth, mean_pth = decode(bst=bst, ratemap=tuningcurve)

    # idea: cycle each column so that the top w rows are the band
    # surrounding the regression line

    scores = np.zeros(bst.n_epochs)
    if n_shuffles > 0:
        scores_time_swap = np.zeros((n_shuffles, bst.n_epochs))
        scores_col_cycle = np.zeros((n_shuffles, bst.n_epochs))

    for idx in range(bst.n_epochs):
        posterior_array = posterior[:, bdries[idx] : bdries[idx + 1]]
        scores[idx] = trajectory_score_array(
            posterior=posterior_array, w=w, normalize=normalize
        )
        for shflidx in range(n_shuffles):
            # time swap:

            posterior_ts = time_swap_array(posterior_array)
            posterior_cs = column_cycle_array(posterior_array)
            scores_time_swap[shflidx, idx] = trajectory_score_array(
                posterior=posterior_ts, w=w, normalize=normalize
            )
            scores_col_cycle[shflidx, idx] = trajectory_score_array(
                posterior=posterior_cs, w=w, normalize=normalize
            )

    if n_shuffles > 0:
        return scores, scores_time_swap, scores_col_cycle
    return scores