Source code for d6tjoin.top1

import pandas as pd
import numpy as np
from collections import OrderedDict
import itertools
import warnings
import jellyfish

# ******************************************
# helpers
# ******************************************


from d6tjoin.utils import _applyFunMulticore, _filter_group_min, _set_values

[docs]class MergeTop1Diff(object): """ Top1 minimum difference join. Mostly used for strings. Helper for `MergeTop1`. """ def __init__(self, df1, df2, fuzzy_left_on, fuzzy_right_on, fun_diff=None, exact_left_on=None, exact_right_on=None, top_limit=None, topn=1, fun_preapply = None, fun_postapply = None, is_keep_debug=False, use_multicore=True): # check exact keys if not exact_left_on: exact_left_on = [] if not exact_right_on: exact_right_on = [] if not isinstance(fuzzy_left_on, (str,)) or not isinstance(fuzzy_right_on, (str,)): raise ValueError('fuzzy_on needs to be a string') if len(exact_left_on) != len(exact_right_on): raise ValueError('Need to pass same number of exact keys') if not isinstance(exact_left_on, (list)) or not isinstance(exact_right_on, (list)): raise ValueError('Exact keys need to be a list') if not callable(fun_diff): raise ValueError('fun_diff needs to a function') if (fun_preapply and fun_postapply) and (not callable(fun_preapply) or not callable(fun_postapply)): raise ValueError('fun_preapply and fun_postapply needs to a function') # use blocking index? if not exact_left_on and not exact_right_on: self.cfg_is_block = False elif exact_left_on and exact_right_on: self.cfg_is_block = True else: raise ValueError('Need to pass exact keys for both or neither dataframe') # store data self.dfs = [df1,df2] # store config self.cfg_fuzzy_left_on = fuzzy_left_on self.cfg_fuzzy_right_on = fuzzy_right_on self.cfg_exact_left_on = exact_left_on self.cfg_exact_right_on = exact_right_on self.cfg_fun_diff = fun_diff self.cfg_fun_preapply = fun_preapply self.cfg_fun_postapply = fun_postapply self.cfg_top_limit = top_limit self.cfg_is_keep_debug = is_keep_debug self.cfg_topn = topn self.cfg_use_multicore = use_multicore def _allpairs_candidates(self): values_left = _set_values(self.dfs[0], self.cfg_fuzzy_left_on) values_right = _set_values(self.dfs[1], self.cfg_fuzzy_right_on) if self.cfg_topn>1: values_left_exact = set() values_left_fuzzy = values_left else: values_left_exact = values_left.intersection(values_right) values_left_fuzzy = values_left.difference(values_right) # pre apply a function if self.cfg_fun_preapply: values_left_fuzzy = [self.cfg_fun_preapply(v) for v in values_left_fuzzy] values_right = [self.cfg_fun_preapply(v) for v in values_right] df_candidates_fuzzy = list(itertools.product(values_left_fuzzy, values_right)) df_candidates_fuzzy = pd.DataFrame(df_candidates_fuzzy,columns=['__top1left__','__top1right__']) df_candidates_fuzzy['__matchtype__'] = 'top1 left' df_candidates_exact = pd.DataFrame({'__top1left__': list(values_left_exact)}) df_candidates_exact['__top1right__'] = df_candidates_exact['__top1left__'] df_candidates_exact['__matchtype__'] = 'exact' df_candidates = df_candidates_exact.append(df_candidates_fuzzy, ignore_index=True) return df_candidates def _top1_diff_noblock(self): df_candidates = self._allpairs_candidates() idxSel = df_candidates['__matchtype__'] != 'exact' if self.cfg_use_multicore: df_candidates.loc[idxSel, '__top1diff__'] = _applyFunMulticore(df_candidates.loc[idxSel,'__top1left__'].values, df_candidates.loc[idxSel,'__top1right__'].values,self.cfg_fun_diff) else: df_candidates.loc[idxSel,'__top1diff__'] = df_candidates[idxSel].apply(lambda x: self.cfg_fun_diff(x['__top1left__'], x['__top1right__']), axis=1) df_candidates.loc[~idxSel, '__top1diff__'] = 0 has_duplicates = False if self.cfg_fun_postapply: df_candidates['__top1left__']=df_candidates['__top1left__'].apply(self.cfg_fun_postapply,1) df_candidates['__top1right__']=df_candidates['__top1right__'].apply(self.cfg_fun_postapply,1) df_diff = df_candidates.groupby('__top1left__',group_keys=False).apply(lambda x: _filter_group_min(x,'__top1diff__',self.cfg_topn)) if self.cfg_top_limit is not None: df_diff = df_diff[df_diff['__top1diff__']<=self.cfg_top_limit] has_duplicates = df_diff.groupby('__top1left__').size().max()>1 if has_duplicates: warnings.warn('Top1 join for %s has duplicates' %self.cfg_fuzzy_left_on) return df_diff, has_duplicates def _merge_top1_diff_noblock(self): df_diff, has_duplicates = self._top1_diff_noblock() dfjoin = self.dfs[0].merge(df_diff, left_on=self.cfg_fuzzy_left_on, right_on='__top1left__') dfjoin = dfjoin.merge(self.dfs[1], left_on='__top1right__', right_on=self.cfg_fuzzy_right_on, suffixes=['','__right__']) if not self.cfg_is_keep_debug: dfjoin = dfjoin[dfjoin.columns[~dfjoin.columns.str.startswith('__')]] return {'merged':dfjoin, 'top1':df_diff, 'duplicates':has_duplicates} def _top1_diff_withblock(self): def apply_gen_candidates_group(dfg): return pd.DataFrame(list(itertools.product(dfg['__top1left__'].values[0],dfg['__top1right__'].values[0])),columns=['__top1left__','__top1right__']) # find key unique values keysleft = self.dfs[0][self.cfg_exact_left_on+[self.cfg_fuzzy_left_on]].drop_duplicates().dropna() keysright = self.dfs[1][self.cfg_exact_right_on+[self.cfg_fuzzy_right_on]].drop_duplicates().dropna() keysleft = {tuple(x) for x in keysleft.values} keysright = {tuple(x) for x in keysright.values} values_left_exact = keysleft.intersection(keysright) values_left_fuzzy = keysleft.difference(keysright) df_keys_left_exact = pd.DataFrame(list(values_left_exact)) if not df_keys_left_exact.empty: df_keys_left_exact.columns = self.cfg_exact_left_on+['__top1left__'] df_keys_left_exact['__top1right__']=df_keys_left_exact['__top1left__'] df_keys_left_exact['__matchtype__'] = 'exact' df_keys_left_fuzzy = pd.DataFrame(list(values_left_fuzzy)) if not df_keys_left_fuzzy.empty: df_keys_left_fuzzy.columns = self.cfg_exact_left_on+[self.cfg_fuzzy_left_on] # fuzzy pair candidates df_keys_left = pd.DataFrame(df_keys_left_fuzzy.groupby(self.cfg_exact_left_on)[self.cfg_fuzzy_left_on].unique()) df_keys_right = pd.DataFrame(self.dfs[1].groupby(self.cfg_exact_right_on)[self.cfg_fuzzy_right_on].unique()) df_keysets_groups = df_keys_left.merge(df_keys_right, left_index=True, right_index=True) df_keysets_groups.columns = ['__top1left__', '__top1right__'] df_keysets_groups = df_keysets_groups.reset_index().groupby(self.cfg_exact_left_on).apply(apply_gen_candidates_group) df_keysets_groups = df_keysets_groups.reset_index(-1, drop=True).reset_index() df_keysets_groups = df_keysets_groups.dropna() df_candidates = df_keysets_groups[['__top1left__', '__top1right__']].drop_duplicates() if self.cfg_use_multicore: df_candidates['__top1diff__'] = _applyFunMulticore(df_candidates['__top1left__'].values, df_candidates['__top1right__'].values, self.cfg_fun_diff) else: df_candidates['__top1diff__'] = df_candidates.apply(lambda x: self.cfg_fun_diff(x['__top1left__'], x['__top1right__']), axis=1) df_candidates['__matchtype__'] = 'top1 left' # calculate difference df_diff = df_keysets_groups.merge(df_candidates, on=['__top1left__', '__top1right__']) df_diff = df_diff.append(df_keys_left_exact) df_diff['__top1diff__']=df_diff['__top1diff__'].fillna(0) # exact keys df_diff = df_diff.groupby(self.cfg_exact_left_on+['__top1left__'],group_keys=False).apply(lambda x: _filter_group_min(x,'__top1diff__')) if self.cfg_top_limit is not None: df_diff = df_diff[df_diff['__top1diff__']<=self.cfg_top_limit] has_duplicates = df_diff.groupby(self.cfg_exact_left_on+['__top1left__']).size().max()>1 return df_diff, has_duplicates def _merge_top1_diff_withblock(self): df_diff, has_duplicates = self._top1_diff_withblock() dfjoin = self.dfs[0].merge(df_diff, left_on=self.cfg_exact_left_on+[self.cfg_fuzzy_left_on], right_on=self.cfg_exact_left_on+['__top1left__']) # todo: add exact join keys dfjoin = dfjoin.merge(self.dfs[1], left_on=self.cfg_exact_left_on+['__top1right__'], right_on=self.cfg_exact_right_on+[self.cfg_fuzzy_right_on], suffixes=['','__right__']) if not self.cfg_is_keep_debug: dfjoin = dfjoin[dfjoin.columns[~dfjoin.columns.str.startswith('__')]] return {'merged':dfjoin, 'top1':df_diff, 'duplicates':has_duplicates}
[docs] def top1_diff(self): if self.cfg_is_block: return self._top1_diff_withblock() else: return self._top1_diff_noblock()
[docs] def merge(self): if not self.cfg_exact_left_on and not self.cfg_exact_right_on: return self._merge_top1_diff_noblock() elif self.cfg_exact_left_on and self.cfg_exact_right_on: return self._merge_top1_diff_withblock() else: raise ValueError('Need to pass exact keys for both or neither dataframe')
[docs]class MergeTop1Number(object): """ Top1 minimum difference join for numbers. Helper for `MergeTop1`. """ def __init__(self, df1, df2, fuzzy_left_on, fuzzy_right_on, exact_left_on=None, exact_right_on=None, direction='nearest', top_limit=None, is_keep_debug=False): # check exact keys if not exact_left_on: exact_left_on = [] if not exact_right_on: exact_right_on = [] if len(exact_left_on) != len(exact_right_on): raise ValueError('Need to pass same number of exact keys') if not isinstance(exact_left_on, (list)) or not isinstance(exact_right_on, (list)): raise ValueError('Exact keys need to be a list') # use blocking index? if not exact_left_on and not exact_right_on: self.cfg_is_block = False elif exact_left_on and exact_right_on: self.cfg_is_block = True else: raise ValueError('Need to pass exact keys for both or neither dataframe') # store data self.dfs = [df1,df2] # store config self.cfg_fuzzy_left_on = fuzzy_left_on self.cfg_fuzzy_right_on = fuzzy_right_on self.cfg_exact_left_on = exact_left_on self.cfg_exact_right_on = exact_right_on self.cfg_direction = direction self.cfg_top_limit = top_limit self.cfg_is_keep_debug = is_keep_debug def _top1_diff_withblock(self): # unique values df_keys_left = self.dfs[0].groupby(self.cfg_exact_left_on)[self.cfg_fuzzy_left_on].apply(lambda x: pd.Series(x.unique())) df_keys_left.index = df_keys_left.index.droplevel(-1) df_keys_left = pd.DataFrame(df_keys_left) df_keys_right = self.dfs[1].groupby(self.cfg_exact_right_on)[self.cfg_fuzzy_right_on].apply(lambda x: pd.Series(x.unique())) df_keys_right.index = df_keys_right.index.droplevel(-1) df_keys_right = pd.DataFrame(df_keys_right) # todo: global consolidation like with MergeTop1Diff # sort df_keys_left = df_keys_left.sort_values(self.cfg_fuzzy_left_on).reset_index().rename(columns={self.cfg_fuzzy_left_on:'__top1left__'}) df_keys_right = df_keys_right.sort_values(self.cfg_fuzzy_right_on).reset_index().rename(columns={self.cfg_fuzzy_right_on:'__top1right__'}) # merge df_diff = pd.merge_asof(df_keys_left, df_keys_right, left_on='__top1left__', right_on='__top1right__', left_by=self.cfg_exact_left_on, right_by=self.cfg_exact_right_on, direction=self.cfg_direction) df_diff['__top1diff__'] = (df_diff['__top1left__']-df_diff['__top1right__']).abs() df_diff['__matchtype__'] = 'top1 left' df_diff.loc[df_diff['__top1left__'] == df_diff['__top1right__'], '__matchtype__'] = 'exact' if self.cfg_top_limit is not None: df_diff = df_diff[df_diff['__top1diff__']<=self.cfg_top_limit] return df_diff def _top1_diff_noblock(self): # uniques values_left = _set_values(self.dfs[0], self.cfg_fuzzy_left_on) values_right = _set_values(self.dfs[1], self.cfg_fuzzy_right_on) # sort df_keys_left = pd.DataFrame({'__top1left__':list(values_left)}).sort_values('__top1left__') df_keys_right = pd.DataFrame({'__top1right__':list(values_right)}).sort_values('__top1right__') # merge df_diff = pd.merge_asof(df_keys_left, df_keys_right, left_on='__top1left__', right_on='__top1right__', direction=self.cfg_direction) df_diff['__top1diff__'] = (df_diff['__top1left__']-df_diff['__top1right__']).abs() df_diff['__matchtype__'] = 'top1 left' df_diff.loc[df_diff['__top1left__'] == df_diff['__top1right__'], '__matchtype__'] = 'exact' return df_diff
[docs] def top1_diff(self): if self.cfg_is_block: return self._top1_diff_withblock() else: return self._top1_diff_noblock()
[docs] def merge(self): df_diff = self.top1_diff() dfjoin = self.dfs[0].merge(df_diff, left_on=self.cfg_exact_left_on+[self.cfg_fuzzy_left_on], right_on=self.cfg_exact_left_on+['__top1left__']) dfjoin = dfjoin.merge(self.dfs[1], left_on=self.cfg_exact_left_on+['__top1right__'], right_on=self.cfg_exact_right_on+[self.cfg_fuzzy_right_on], suffixes=['','__right__']) if not self.cfg_is_keep_debug: dfjoin = dfjoin[dfjoin.columns[~dfjoin.columns.str.startswith('__')]] return {'merged': dfjoin, 'top1': df_diff, 'duplicates': None}
[docs]class MergeTop1(object): """ Left best match join. It applies a difference function to find the key pair with the smallest difference to the join key. Args: df1 (dataframe): left dataframe onto which the right dataframe is joined df2 (dataframe): right dataframe fuzzy_left_on (list): join keys for similarity match, left dataframe fuzzy_right_on (list): join keys for similarity match, right dataframe exact_left_on (list, default None): join keys for exact match, left dataframe exact_right_on (list, default None): join keys for exact match, right dataframe fun_diff (list, default None): list of difference functions to be applied for each fuzzy key top_limit (list, default None): list of values to cap similarity matches is_keep_debug (bool): keep diagnostics columns, good for debugging Note: * fun_diff: applies the difference function to find the best match with minimum distance * By default gets automatically determined depending on whether you have a string or date/number * Use `None` to keep the default, so example [None, lambda x, y: x-y] * Functions within list get applied in order same order to fuzzy join keys * Needs to be a difference function so lower is better. For functions like Jaccard higher is better so you need to adjust for that * top_limit: Limits the number of matches to anything below that values. For example if two strings differ by 3 but top_limit is 2, that match will be ignored * for dates you can use `pd.offsets.Day(1)` or similar """ def __init__(self, df1, df2, fuzzy_left_on=None, fuzzy_right_on=None, exact_left_on=None, exact_right_on=None, fun_diff = None, top_limit=None, is_keep_debug=False, use_multicore=True): # todo: pass custom merge asof param # todo: pass list of fundiff # check fuzzy keys if not fuzzy_left_on or not fuzzy_right_on: raise ValueError('Need to pass fuzzy left and right keys') if len(fuzzy_left_on) != len(fuzzy_right_on): raise ValueError('Need to pass same number of fuzzy left and right keys') self.cfg_njoins_fuzzy = len(fuzzy_left_on) # check exact keys if not exact_left_on: exact_left_on = [] if not exact_right_on: exact_right_on = [] if len(exact_left_on) != len(exact_right_on): raise ValueError('Need to pass same number of exact keys') if not isinstance(exact_left_on, (list)) or not isinstance(exact_right_on, (list)): raise ValueError('Exact keys need to be a list') # use blocking index? if not exact_left_on and not exact_right_on: self.cfg_is_block = False elif exact_left_on and exact_right_on: self.cfg_is_block = True else: raise ValueError('Need to pass exact keys for both or neither dataframe') # check custom params if not top_limit: top_limit = [None,]*self.cfg_njoins_fuzzy if not fun_diff: fun_diff = [None,]*self.cfg_njoins_fuzzy elif len(fun_diff)!=len(fuzzy_left_on): raise ValueError('fun_diff needs to the same length as fuzzy_left_on. Use None in list to use default') if not isinstance(top_limit, (list,)) or not len(top_limit)==self.cfg_njoins_fuzzy: raise NotImplementedError('top_limit needs to a list with entries for each fuzzy join key') if not isinstance(fun_diff, (list,)) or not len(top_limit)==self.cfg_njoins_fuzzy: raise NotImplementedError('fun_diff needs to a list with entries for each fuzzy join key') # store data self.dfs = [df1,df2] # store config self.cfg_fuzzy_left_on = fuzzy_left_on self.cfg_fuzzy_right_on = fuzzy_right_on # todo: exact keys by fuzzy key? or just global? self.cfg_exact_left_on = exact_left_on self.cfg_exact_right_on = exact_right_on self.cfg_top_limit = top_limit self.cfg_fun_diff = fun_diff self.cfg_is_keep_debug = is_keep_debug self.cfg_use_multicore = use_multicore
[docs] def merge(self): """ Executes merge Returns: dict: keys 'merged' has merged dataframe, 'top1' has best matches by fuzzy_left_on. See example notebooks for details """ df_diff_bylevel = OrderedDict() self.dfjoined = self.dfs[0].copy() cfg_exact_left_on = self.cfg_exact_left_on cfg_exact_right_on = self.cfg_exact_right_on a=1 for ilevel, ikey in enumerate(self.cfg_fuzzy_left_on): keyleft = ikey keyright = self.cfg_fuzzy_right_on[ilevel] typeleft = self.dfs[0][keyleft].dtype if self.cfg_fun_diff[ilevel]: df_diff_bylevel[ikey] = MergeTop1Diff(self.dfjoined, self.dfs[1], keyleft, keyright, self.cfg_fun_diff[ilevel], cfg_exact_left_on, cfg_exact_right_on, top_limit=self.cfg_top_limit[ilevel], use_multicore=self.cfg_use_multicore).top1_diff()[0] else: if typeleft == 'int64' or typeleft == 'float64' or typeleft == 'datetime64[ns]': df_diff_bylevel[ikey] = MergeTop1Number(self.dfjoined, self.dfs[1], keyleft, keyright, cfg_exact_left_on, cfg_exact_right_on, top_limit=self.cfg_top_limit[ilevel]).top1_diff() elif typeleft == 'object' and type(self.dfs[0][keyleft].values[0])==str: df_diff_bylevel[ikey] = MergeTop1Diff(self.dfjoined, self.dfs[1], keyleft, keyright, jellyfish.levenshtein_distance, cfg_exact_left_on, cfg_exact_right_on, top_limit=self.cfg_top_limit[ilevel], use_multicore=self.cfg_use_multicore).top1_diff()[0] # todo: handle duplicates else: raise ValueError('Unrecognized data type for top match, need to pass fun_diff in arguments') self.dfjoined = self.dfjoined.merge(df_diff_bylevel[ikey], left_on=cfg_exact_left_on+[keyleft], right_on=cfg_exact_left_on+['__top1left__'], suffixes=['',keyleft]) cfg_col_rename = ['__top1left__','__top1right__','__top1diff__','__matchtype__'] self.dfjoined = self.dfjoined.rename(columns=dict((k,k+keyleft) for k in cfg_col_rename)) cfg_exact_left_on += ['__top1right__%s'%keyleft,] cfg_exact_right_on += [keyright,] self.dfjoined = self.dfjoined.merge(self.dfs[1], left_on=cfg_exact_left_on, right_on=cfg_exact_right_on, suffixes=['','_right']) if not self.cfg_is_keep_debug: self.dfjoined = self.dfjoined[self.dfjoined.columns[~self.dfjoined.columns.str.startswith('__')]] return {'merged': self.dfjoined, 'top1': df_diff_bylevel, 'duplicates': None}
''' multikey: want to merge left match onto right df dont to numbers (non key) join until the very end '''