Source code for dansy.network_separation_helpers

import warnings
import networkx as nx
import numpy as np
from dansy.dansy import dansy

def mean_minimum_dist(G_nodes, ref_net_params_dict):
    '''
    Calculates the average shortest path length between nodes within the network along a reference network. Nodes that are isolates within the reference network have a shortest path length of 0, and nodes within the network which represent a completely collapses connected component also have a shortest path length of 0 or a penalty value if provided. 
    '''
    # Unpacking the reference information that is used in calculating the shortest path lengths
    ref_isolates = ref_net_params_dict['isolates']

    if isinstance(G_nodes, nx.Graph):
        nodesOI = set(G_nodes.nodes())
    else:
        nodesOI = G_nodes

    if len(nodesOI) > 0:
        # Don't bother checking the expected isolates. These will be accounted for in the total length as they contribute as 0s in the final calculation.
        nodes_2_check = nodesOI.difference(ref_isolates)
        
        # Get the shortest path length for individual nodes that do exist in the reference network
        spl = spl_inner(nodes_2_check, nodesOI, ref_net_params_dict)
        
        # Ensuring the isolates are accounted for in the average
        mean_min_dist = np.sum(spl)/len(nodesOI)
    
    else:
        mean_min_dist = np.nan

    return mean_min_dist

def spl_inner(nodes_2_check, all_nodes, ref_net_analysis_parameters):
    '''
    An inner calculation function that gets the shortest path length between nodes of interest and target nodes that are often found within 2 subnetworks of a larger network. For nodes that do not have connections between the subnetworks these are given a penalty term that is defined in the reference network information provided.

    Parameters:
    -----------
        - nodes_2_check: list
            - Nodes of interest who act as a source of the shortest path lengths are to be gathered
        - all_nodes: list
            - The target list of nodes that the shortest path distance from the nodes of interest is to be queried for.
        - ref_net_analysis_parameters: dict
            - A dictionary that contains several components of the reference, background network that are used in the calculation. See build_network_reference_dict for more details.

    Returns:
    --------
        - spl: list
            - The shortest path distances for all the nodes of interest. (Note that if there are no connections between a node of interest and the target nodes, this will be the penalty term found in the reference network dict.)
    
    '''
    
    reference_spl_dict = ref_net_analysis_parameters['spl']
    penalty_input = ref_net_analysis_parameters['penalty']

    # Verify that the penalty that was to be used is an appropriate format and can be used for downstream analysis
    dynamic_flag = False
    # If a constant term is provided
    if isinstance(penalty_input, int):
        penalty = penalty_input
    # If one of the acceptable methods is provided and thus being calculated
    elif penalty_input == 'dynamic':
        dynamic_flag = True
    else:
        raise TypeError('Inappropriate penalty term input')

    # Some precomputing for the dynamic method
    if dynamic_flag:
    
        if 'CCs' in ref_net_analysis_parameters and 'cc_diameters' in ref_net_analysis_parameters:
            ccs = ref_net_analysis_parameters['CCs']
            cc_diams = ref_net_analysis_parameters['cc_diameters']
        else:
           raise ValueError('An invalid reference data dictionary was provided. Please run build_network_reference_dict')
        
        # Reduce the connected components that are being referenced are only the ones that contain nodes within the network.
        ccs = [cc for cc in ccs if cc.intersection(nodes_2_check)]
        
        # Fail-safe check for debugging
        if nodes_2_check:
            if (not cc_diams) & (len(ccs) > 0):      
                raise ValueError(f'Whoops something went very wrong and this should not happen.')
    
    # In some instances there may be some nodes removed from the spl dict (via pruning), so will filter out nodes that are not a part of that
    orig_nodes = len(nodes_2_check)
    nodes_2_check = set(nodes_2_check).intersection(reference_spl_dict.keys())

    # If there was a pruned shortest path length dict passed then ensuring that the full complement of nodes does not include the pruned nodes
    if len(nodes_2_check) != orig_nodes:
        all_nodes = set(all_nodes).intersection(reference_spl_dict.keys())


    # Now getting the shortest path lengths based on the provided parameters
    spl = []
    for node in nodes_2_check:
        tmp_spl = reference_spl_dict[node]
        spl_dict = {k:v for k,v in tmp_spl.items() if k in all_nodes}
        if node in spl_dict:
            del spl_dict[node] # Removing the self-reference that is always 0

        if spl_dict:
            spl.append(min(spl_dict.values()))
        else: # If a node by itself represents a single connected component from the reference adding a penalty for collapsing the entire connected component
            
            # The dynamic method 
            if dynamic_flag:
                penalty = cc_diams[node] + 1
                spl.append(penalty)
            else:
                spl.append(penalty)

    return spl


[docs] def network_separation(G_in, H_in, ref_G_data, mmd_verbose = False, force_run = False, verbose = True): ''' Calculates the network separation between two networks of interest that lie on a common larger, reference network. Parameters: ----------- - G_in, H_in: networkx Graph | DomainNgramNetwork - networks of interest - ref_G_data: dict - dict containing information about the larger, common network that the two given networks are subnetworks of. - mmd_verbose: bool (Optional, to be deprecated) - Whether a statement is printed. - force_run: bool (Optional) - Whether the network separation should be run even if it does not meet minimum values necessary. Returns: -------- - s : float - the network separation of two different subnetworks. ''' # Unpacking the reference network information that is used for getting minimum distance values ref_isolates = ref_G_data['isolates'] # Now getting the minimum distances for both the sub-networks of interest # As part of the mean minimum distance calculation there is a secondary option to set penalties for disconnected nodes. If it is not provided then will set the penalty to zero mmd_ref = {**ref_G_data} # Need to create a copy otherwise can overwrite the penalty value by accident if 'mean_dist_penalty' in ref_G_data: if isinstance(ref_G_data['mean_dist_penalty'], int): # To allow a separate penalty term for the mean shortest path distance calculation from the network separation mmd_ref['penalty'] = ref_G_data['mean_dist_penalty'] elif isinstance(ref_G_data['mean_dist_penalty'], bool): if ref_G_data['mean_dist_penalty']: pass # Already has the penalty term passed else: mmd_ref['penalty'] = 0 else: raise ValueError('An improper penalty value was provided for calculating the mean minimum distance.') else: mmd_ref['penalty'] = 0 # After some parameter sweeps there is a rough cutoff for networks that represent ~20 proteins that tend to exhibit wide ranges in network separation values that are not as informative to network characteristics as they frequently will have larger isolate fractions than landing within a large connected component. Here checking that both networks meet the minimum size otherwise raising an error. (Unless the force run flag is present.) if isinstance(G_in, dansy) and isinstance(H_in, dansy): if (len(G_in.protsOI) < 20) or (len(H_in.protsOI) < 20): if force_run: if verbose: warnings.warn('At least one network does not reach recommended minimum size, but will still be analyzed.') else: raise ValueError('At least one network does not reach recommended minimum size.') G = G_in.G H = H_in.G else: G = G_in H = H_in # Getting the list/set of nodes for doing calculations on G_v = set(G.nodes()) H_v = set(H.nodes()) if mmd_verbose: # This is for debugging purposes but can be retained if needed. print(f"Input penalty for the mean minimum distance is {mmd_ref['penalty']}") G1_mmd = mean_minimum_dist(G_v,mmd_ref) G2_mmd = mean_minimum_dist(H_v,mmd_ref) # To find the separation between the two networks will find the minimum distance from nodes in one network to nodes in the other network. # First finding the overlap in nodes as these will be 0 and don't need to go through the process. node_overlap = G_v.intersection(H_v) nodesOI_1 = G_v.difference(node_overlap) nodesOI_2 = H_v.difference(node_overlap) # For the nodes that were isolates in the reference network removing them from running in the for loop as they will not provide any additional information. nodesOI_1 = nodesOI_1.difference(ref_isolates) nodesOI_2 = nodesOI_2.difference(ref_isolates) # Running through the non-isolate nodes and getting the shortest distances from nodes within one network to nodes in another. spl_union = spl_inner(nodesOI_1, H_v,ref_G_data) tmp = spl_inner(nodesOI_2,G_v,ref_G_data) spl_union += tmp # Now for the isolates to account for their contributions and creating a constant 1 path length penalty for those that are not shared between networks isols_1 = G_v.intersection(ref_isolates) isols_2 = H_v.intersection(ref_isolates) isol_correction_1 = len(isols_1.difference(isols_2)) isol_correction_2 = len(isols_2.difference(isols_1)) # Getting the average minimum distance across the union of both networks union_mmd = (sum(spl_union)+isol_correction_1+isol_correction_2)/(G.number_of_nodes()+H.number_of_nodes()) s = union_mmd - (G1_mmd+G2_mmd)/2 return s
[docs] def build_network_reference_dict(ref_ngram_net, penalty = None): ''' This builds the dict that contains the reference network information necessary for calculating the network separation value from a provided DomainNgramNetwork object. It is recommended to use this function to generate the reference dictionary prior to calculating network separation, especially when using a dynamic penalty and comparing several networks, to improve the execution speed of the calculation. Parameters: ----------- dnn: DomainNgramNetwork - A populated Domain N-gram network that will be used as a reference penalty: str or int (Optional) - What type of penalty will be used for network separation. If not specified will default to the dynamic method. Returns: -------- ref_data_dict: dict - Key-value pairs that are used for network separation calculations. Key-Value Pairs: - penalty: int | 'dynamic' The penalty term that will be used for shortest path distances - spl: dict The shortest path length of all pairs of nodes within the network. - isolates: list List of isolates in the network - CCs: list List of tuples of nodes that are the connected components of the network - cc_diameters: dict Key-value pairs of nodes and the diameter of the connected componenet they are a part of. - G: networkx Graph The graph of the full network. ''' # Default value for the penalty term if penalty == None: penalty = 'dynamic' # Shortest path lengths between individual nodes and non-isolate nodes comp_pw_shortest_path_lens = dict(nx.all_pairs_shortest_path_length(ref_ngram_net.G)) ccs = nx.connected_components(ref_ngram_net.G) ccs = [cc for cc in ccs if len(cc) > 1] # Building a diameter dictionary where the keys are each node and the value is the diameter for the connected component including that node cc_diams = {} for cc in ccs: d = nx.diameter(ref_ngram_net.G.subgraph(cc)) for node in cc: cc_diams[node] = d ref_data_dict = {'spl':comp_pw_shortest_path_lens, 'penalty':penalty, 'isolates':list(nx.isolates(ref_ngram_net.G)), 'CCs':ccs, 'cc_diameters':cc_diams, 'G':ref_ngram_net.G} return ref_data_dict