Source code for floweaver.dataset

import pandas as pd
import networkx as nx

from .partition import Partition


def leaves_below(tree, node):
    return set(sum(([vv for vv in v if tree.out_degree(vv) == 0]
                    for k, v in nx.dfs_successors(tree, node).items()), []))


class Resolver:
    def __init__(self, df, column):
        self.df = df
        self.column = column

    def __iter__(self):
        # XXX hack to avoid __getitem__ being called with integer indices, but
        # to have non-zero len.
        return iter(['keys'])

    def __getitem__(self, k):
        if not self.column:
            col = k
        elif k == 'id':
            col = self.column
        else:
            col = '{}.{}'.format(self.column, k)
        return self.df[col]


def eval_selection(df, column, sel):
    if isinstance(sel, (list, tuple)):
        return df[column].isin(sel)
    elif isinstance(sel, str):
        resolver = Resolver(df, column)
        return df.eval(sel,
                       local_dict={},
                       global_dict={},
                       resolvers=(resolver, ))
    else:
        raise TypeError('Unknown selection type: %s' % type(sel))


[docs]class Dataset: def __init__(self, flows, dim_process=None, dim_material=None, dim_time=None): if dim_process is not None and not dim_process.index.is_unique: raise ValueError('dim_process index not unique') if dim_material is not None and not dim_material.index.is_unique: raise ValueError('dim_material index not unique') if dim_time is not None and not dim_time.index.is_unique: raise ValueError('dim_time index not unique') # Fixed bug: make sure flows index is unique flows = flows.reset_index(drop=True) self._flows = flows self._dim_process = dim_process self._dim_material = dim_material self._dim_time = dim_time self._table = flows if dim_process is not None: self._table = self._table \ .join(dim_process.add_prefix('source.'), on='source') \ .join(dim_process.add_prefix('target.'), on='target') if dim_material is not None: self._table = self._table \ .join(dim_material.add_prefix('material.'), on='material') if dim_time is not None: self._table = self._table \ .join(dim_time.add_prefix('time.'), on='time') def partition(self, dimension, processes=None): """Partition of all values of `dimension` within `processes`""" if processes: q = (self._table.source.isin(processes) | self._table.target.isin(processes)) values = self._table.loc[q, dimension].unique() else: values = self._table[dimension].unique() return Partition.Simple(dimension, values) def apply_view(self, process_groups, bundles, flow_selection=None): return _apply_view(self, process_groups, bundles, flow_selection) def save(self, filename): with pd.HDFStore(filename) as store: store['flows'] = self._flows if self._dim_process is not None: store['dim_process'] = self._dim_process if self._dim_material is not None: store['dim_material'] = self._dim_material if self._dim_time is not None: store['dim_time'] = self._dim_time @classmethod def from_hdf(cls, filename): with pd.HDFStore(filename) as store: return cls(store['flows'], store['dim_process'] if 'dim_process' in store else None, store['dim_material'] if 'dim_material' in store else None, store['dim_time'] if 'dim_time' in store else None) @classmethod def from_csv(cls, flows_filename, dim_process_filename=None, dim_material_filename=None, dim_time_filename=None): def read(filename): if filename is not None: return pd.read_csv(filename).set_index('id') else: return None flows = pd.read_csv(flows_filename) dim_process = read(dim_process_filename) dim_material = read(dim_material_filename) dim_time = read(dim_time_filename) return cls(flows, dim_process, dim_material, dim_time)
def find_flows(flows, source_query, target_query, flow_query=None, ignore_edges=None): """Filter flows according to source_query, target_query, and flow_query. """ if flow_query is not None: flows = flows[eval_selection(flows, '', flow_query)] if source_query is None and target_query is None: raise ValueError('source_query and target_query cannot both be None') elif source_query is None and target_query is not None: qt = eval_selection(flows, 'target', target_query) qs = (~eval_selection(flows, 'source', target_query) & ~flows.index.isin(ignore_edges or [])) elif source_query is not None and target_query is None: qs = eval_selection(flows, 'source', source_query) qt = (~eval_selection(flows, 'target', source_query) & ~flows.index.isin(ignore_edges or [])) else: qs = eval_selection(flows, 'source', source_query) qt = eval_selection(flows, 'target', target_query) f = flows[qs & qt] if source_query is None: internal_source = None else: internal_source = flows[qs & eval_selection(flows, 'target', source_query)] if target_query is None: internal_target = None else: internal_target = flows[qt & eval_selection(flows, 'source', target_query)] return f, internal_source, internal_target def _apply_view(dataset, process_groups, bundles, flow_selection): # What we want to warn about is flows between process_groups in the view_graph; they # are "used", since they appear in Elsewhere bundles, but the connection # isn't visible. used_edges = set() used_internal = set() used_process_groups = set() bundle_flows = {} table = dataset._table if flow_selection: table = table[eval_selection(table, '', flow_selection)] for k, bundle in bundles.items(): if bundle.from_elsewhere or bundle.to_elsewhere: continue # do these afterwards source = process_groups[bundle.source] target = process_groups[bundle.target] flows, internal_source, internal_target = \ find_flows(table, source.selection, target.selection, bundle.flow_selection) assert len(used_edges.intersection( flows.index.values)) == 0, 'duplicate bundle' bundle_flows[k] = flows used_edges.update(flows.index.values) used_process_groups.update(flows.source) used_process_groups.update(flows.target) # Also marked internal edges as "used" used_internal.update(internal_source.index.values) used_internal.update(internal_target.index.values) for k, bundle in bundles.items(): if bundle.from_elsewhere and bundle.to_elsewhere: raise ValueError('Cannot have flow from Elsewhere to Elsewhere') elif bundle.from_elsewhere: target = process_groups[bundle.target] flows, _, _ = find_flows(table, None, target.selection, bundle.flow_selection, used_edges) used_process_groups.add(bundle.target) elif bundle.to_elsewhere: source = process_groups[bundle.source] flows, _, _ = find_flows(table, source.selection, None, bundle.flow_selection, used_edges) used_process_groups.add(bundle.source) else: continue bundle_flows[k] = flows # XXX shouldn't this check processes in selections, not process groups? # Check set of process_groups relevant_flows = dataset._flows[dataset._flows.source.isin( used_process_groups) & dataset._flows.target.isin(used_process_groups)] unused_flows = relevant_flows[~relevant_flows.index.isin(used_edges) & ~relevant_flows.index.isin(used_internal)] return bundle_flows, unused_flows