Welcome to ARC’s documentation!

Documentation

AutoLinker

class arc.autolinker.AutoLinker(spark=None, catalog=None, schema=None, experiment_name=None, training_columns=None, deterministic_columns=None)

Class to create object arc for automated data linking.

Parameters
  • spark (str) – Spark instance used

  • catalog (str) – Unity Catalog name, if used, for intermediate tables

  • schema (str) – Schema/database name for intermediate tables

  • experiment_name (str) – Location or simply name of the MLflow experiment for storing trial run results

  • training_columns (list) – Valid list of columns (or column names separated by ‘&’ for AND combinations) of columns to be fixed during EM training

  • deterministic_columns (list) – Valid list of columns (or column names separated by ‘&’ for AND combinations) of columns to be used for prior estimation

Return type

None

Basic usage:


>>> arc = AutoLinker(
...   catalog="splink_catalog",                # catalog name
...   schema="splink_schema",                  # schema to write results to
...   experiment_name="autosplink_experiment"  # MLflow experiment location
... )
>>> arc.auto_link(
...   data=data,                               # dataset to dedupe
...   attribute_columns=["A", "B", "C", "D"],  # columns that contain attributes to compare
...   unique_id="id",                          # column name of the unique ID
...   comparison_size_limit=500000,            # Maximum number of pairs when blocking applied
...   max_evals=100                            # Maximum number of hyperopt trials to run
... )
>>> arc.best_linker.m_u_parameters_chart()     # use Splink functionality out-of-the-box
_calculate_dataset_entropy(data, attribute_columns, by_cluster=False, base=0)

Method to calculate the average entropy of each attribute column in a dataset. Uses the custom arc_entropy_agg function from ARC’s SQL built-ins to efficiently calculate entropy across all columns.

Returns a dictionary with keys as columns and values as entropy in the columns.

Parameters
  • data (pyspark.sql.dataframe.DataFrame) – input dataframe with row per record

  • column – (valid) name of column to calculate entropy on

  • by_cluster (bool) – if True, it will calculate the average entropy when the dataset is split by clusters.

  • base (int) – base of the log in the entropy calculation

  • attribute_columns (list) –

Return type

dict

_calculate_unsupervised_metrics(clusters, attribute_columns, base)

Method to calculate the chosen metric for unsupervised optimisation, the power ratio of information gains when calculated using a base of the number of clusters and a base of the maximum number of unique values in any columns in the original data. Information gain is defined as the difference in average entropy of clustered records when split and the entropy of the data points that are being matched.

Let the number of clusters in the matched subset of the data be \(c\)

Let the maximum number of unique values in any column in the original dataset be \(u\)

Then the “scaled” entropy of column \(k\), \(N\) unique values with probability \(P\) is

\(E_{s,k} = -\Sigma_{i}^{N} P_{i} \log_{c}(P_{i})\)

Then the “adjusted” entropy of column \(k\), \(N\) unique values with probability \(P\) is

\(E_{a,k} = -\Sigma_{i}^{N} P_{i} \log_{u}(P_{i})\)

The scaled information gain is

\(I_{s} = \Sigma_{k}^{K} E_{s,k} - E'_{s,k}\)

and the adjusted information gain is

\(I_{a} = \Sigma_{k}^{K} E_{a,k} - E'_{a,k}\)

where \(E'\) is the mean entropy of the individual clusters predicted.

The metric to optimise for is:

\(I_{s}^{I_{a}}\)

Returns information gain metric, as dictionary.

Parameters
  • clusters (pyspark.sql.dataframe.DataFrame) – dataframe returned from linker clustering

  • attribute_columns (list) – list of valid column names containing attributes in the clusters dataset

  • base (int) – the base for the log when calculating the adjusted entropy

Return type

dict

Method to convert hyperopt trials to a dictionary that can be used to train a linker model. Used for training the best linker model at the end of an experiment. Sets class attributes for best metric and parameters as well. Returns a dictionary.

Return type

dict

_create_attribute_columns(autolink_data, unique_id, true_label)

Called only when an autolink process is initiated, this function will calculate which attribute columns to use and do the necessary remapping work. Returns ——-

_create_comparison_list(space)

Method to convert comparisons dictionary generated by hyperopt to a Splink-compatible list of spark_comparison_library functions with the generated columns and thresholds.

Returns list of spark_comparison_library method instances.

Parameters

space (dict) – nested dicitonary generated by create_comparison_dict

Return type

list

_create_hyperopt_space(data, attribute_columns, comparison_size_limit, sample_for_blocking_rules, max_columns_per_and_rule=2, max_rules_per_or_rule=3)

Method to create hyperopt space for comparison and blocking rule hyperparameters from list of columns. Takes a given (or generated) list of columns and produces a dictionary that can be converted to a comparison list later on, with function names and threhsolds, and a list (of lists) of blocking rules to test.

Returns a dictionary for hyperopt parameter search

Parameters
  • data (pyspark.sql.dataframe.DataFrame) – input data with records per row

  • attribute_columns (list) – valid column names of data, containing all possible columns to compare

  • comparison_size_limit (int) – maximum number of pairs we want to compare, to limit hardware issues

  • max_columns_per_and_rule (int) – the maximum number of column comparisons in a single rule to try

  • max_rules_per_or_rule (int) – the maximum number of rules comparisons in a composite rule to try

Return type

dict

_do_clustering()
_drop_intermediate_tables()

Method to drop intermediate tables for clean consecutive runs.

_evaluate_data_input_arg(data)
Parameters

data (Union[pyspark.sql.dataframe.DataFrame, list]) –

_generate_candidate_blocking_rules(data, attribute_columns, comparison_size_limit, sample_for_blocking_rules, max_columns_per_and_rule=2, max_rules_per_or_rule=3)

Method to automatically generate a list of lists of blocking rules to test, given a user-defined limit for pair-wise comparison size. Uses appriximated method built in to ARC’s SQL functions.

Returns a nested list of lists with Splink-compatible blocking rule queries.

Parameters
  • data (pyspark.sql.dataframe.DataFrame) – input data with record-per-row

  • attribute_columns (list) – valid column names of data, containing all possible columns to block on

  • comparison_size_limit (int) – the maximum number of pairs we want to compare, to limit hardware issues

  • max_columns_per_and_rule (int) – the maximum number of column comparisons in a single rule to try

  • max_rules_per_or_rule (int) – the maximum number of rules comparisons in a composite rule to try

Return type

list

_generate_rules(columns)

Method to create a list of Splink-compatible SQL statements from a list of candidate columns for rules. Can be used for simple blocking rules (no AND statement), deterministic rules or for training rules.

Returns list of strings of Splink-compatible SQL statements.

Parameters

columns (list) – valid attribute columns in the data set

Return type

list

_get_catalog(spark)
_get_rowcounts(linker_mode, autolink_data)
_get_schema(spark)
_get_spark(autolink_data)
Parameters

autolink_data (Union[pyspark.sql.dataframe.DataFrame, list]) –

_randomise_columns(attribute_columns)

Method to randomly select (combinations of) columns from attribute columns for EM training. Will try to pick 2 combinations of 2 (i.e AB and BC from ABC), but will default to 2 if there are only 2.

Returns list of lists of strings.

Parameters

attribute_columns (list) – list of strings containing all attribute columns

Return type

list

_set_mlflow_experiment_name(spark)
_set_unique_id(autolink_data)

Method to run a series of hyperopt trials.

Parameters
  • data (Union[pyspark.sql.dataframe.DataFrame, list]) – Either a Spark DataFrame containing the data to be de-duplicated, or a list of 2 Spark Dataframes to be linked.

  • space – dictionary generated by hyperopt sampling

  • attribute_columns (Optional[list]) – list of strings containing all attribute columns

  • unique_id (Optional[str]) – string with the name of the unique ID column

  • comparison_size_limit (int) – int denoting maximum size of pairs allowed after blocking

  • max_evals (int) – int denoting max number of hyperopt trials to run

  • cleaning – string (“all” or “none”) or dictionary with keys as column names and values as list of strings for methods (accepted are “lower” and “alphanumeric_only”)

  • threshold (float) – float indicating the probability threshold above which a pair is considered a match≠

  • true_label (Optional[str]) – The name of the column with true record ids, if exists (default None) - if not None, auto_link will attempt to calculate empirical scores

  • random_seed (int) – Seed for Hyperopt fmin

  • sample_for_blocking_rules – boolean, default True. down sample the data to 10,000 records for blocking rule generation.

  • metric (str) –

Return type

None

best_clusters_at_threshold(threshold=0.8)

Convert the pairwise predictions to clusters using the connected components algorithm.

Parameters :param threshold: default value=0.8.An optional parameter controlling the threshold at which records will be connected. Set it higher to produce clusters with greater precision, lower to produce clusters with greater recall.

Returns a spark dataframe of the clustered input data with a new column cluster_id prepended.

Parameters

threshold (float) –

Return type

pyspark.sql.dataframe.DataFrame

best_predictions()

Get the predictions from the best linker model trained. Returns a spark dataframe of the pairwise predictions made by the autolinker model.

cluster_viewer()

Produce an interactive dashboard for visualising clusters. It provides examples of clusters of different sizes. The shape and size of clusters can be indicative of problems with record linkage, so it provides a tool to help you find potential false positive and negative links. See this video for an indepth explanation of interpreting this dashboard: https://www.youtube.com/watch?v=DNvCMqjipis

Writes a HTML file to DBFS at “/dbfs/Users/{username}/scv.html”

comparison_viewer()

Produce an interactive dashboard for querying comparison details. See this video for an indepth explanation of interpreting this dashboard: https://www.youtube.com/watch?v=DNvCMqjipis

Writes a HTML file to DBFS at “/dbfs/Users/{username}/scv.html”

Returns None.

Return type

None

estimate_clustering_columns(data, unique_id, true_label)

Use all values as attributes for deduping. This method is in place in case we want to do something different in future. Parameters ———- data :

Parameters
  • data (pyspark.sql.dataframe.DataFrame) –

  • unique_id (str) –

  • true_label (str) –

estimate_linking_columns(data, unique_id, true_label)

This function estimates the attribute columns for linking 2 datasets. It does this by joining each dataset on each column to every other column, and choosing the pairing with the highest count.

data : 2 spark dataframes which will be linked

Returns - a mapping of the 2 tables to a standard schema. ——-

Parameters
  • data (list) –

  • unique_id (str) –

  • true_label (str) –

evaluate_linker(data, predictions, threshold, attribute_columns, unique_id, linker, true_label=None)

Method to evaluate predictions made by a trained linker model.

Returns a dictionary of evaluation metrics.

Parameters
  • data (Union[pyspark.sql.dataframe.DataFrame, list]) – Spark Dataframe containing the original dataset (required to establish ground truth labels)

  • predictions (splink.splink_dataframe.SplinkDataFrame) – Splink DataFrame as a result of calling linker.predict()

  • threshold (float) – float indicating the probability threshold above which a pair is considered a match

  • attribute_columns (list) – list of strings with valid column names to compare the entropies of

  • unique_id (str) – string with the name of the unique ID column

  • linker (splink.spark.linker.SparkLinker) – isntance of splink.SparkLinker, used for deduplication (clustering)

  • true_label (Optional[str]) – The name of the column with true record ids, if exists (default None) - if not None, auto_link will attempt to calculate empirical scores

Return type

dict

get_PR_count(scores_df, unique_id)
get_RR_count(data_df, unique_id, true_label)

Get the underlying Splink model from the MLFlow Model Registry. Useful for advanced users wishing to access Splink’s internal functionality not exposed as part of arc Autolinker. Returns ——- A Splink model object.

get_clustering_metrics(clusters, true_label)
get_confusion_metrics(data_df, predictions_df, thld, unique_id, true_label)
get_scores_df(data_df, predictions_df, unique_id, true_label)
match_weights_chart()

Get the match_weights_chart

Return type

None

train_and_evaluate_linker(data, space, attribute_columns, unique_id, deterministic_columns=None, training_columns=None, threshold=0.9, true_label=None)

Method to train and evaluate a linker model in one go

Returns a tuple of trained linker, predictions, metrics, parameters and mlflow run_id

Parameters
  • data (Union[pyspark.sql.dataframe.DataFrame, list]) – Spark DataFrame containing the data to be de-duplicated

  • space (dict) – dictionary generated by hyperopt sampling

  • attribute_columns (list) – list of strings containing all attribute columns

  • unique_id (str) – string with the name of the unique ID column

  • columns (deterministic) – list of strings containint columns to block on - if None, they will be generated automatically/randomly

  • training_columns (Optional[list]) – list of strings containing training columns - if None, they will be generated automatically/randomly

  • threshold (float) – float indicating the probability threshold above which a pair is considered a match

  • true_label (Optional[str]) – The name of the column with true record ids, if exists (default None) - if not None, auto_link will attempt to calculate empirical scores

  • deterministic_columns (Optional[list]) –

Return type

Tuple[splink.spark.linker.SparkLinker, splink.splink_dataframe.SplinkDataFrame, dict, dict, str]

train_linker(data, space, attribute_columns, unique_id, training_columns=None)

Method to train a linker model.

Returns a 2-tuple of trained linker object and Splink dataframe with predictions

Parameters
  • data (Union[pyspark.sql.dataframe.DataFrame, list]) – Spark DataFrame containing the data to be de-duplicated

  • space (dict) – dictionary generated by hyperopt sampling

  • attribute_columns (list) – list of strings containing all attribute columns

  • unique_id (str) – string with the name of the unique ID column

  • training_columns (Optional[list]) – list of strings containing training columns

Return type

Tuple[splink.spark.linker.SparkLinker, splink.splink_dataframe.SplinkDataFrame]

MLflow utilities

Wrapper classes

Wrapper class necessary for storing and retrieving the splink model JSON from the MLFlow model repository.

Loads linker settings json into context

Parameters

context (mlflow.pyfunc.model.PythonModelContext) – Python Model context instance

Return type

None

MLFlow compatibility wrapper for splink linker.

Loads artifacts from the specified PythonModelContext that can be used by predict() when evaluating inputs. When loading an MLflow model with load_model(), this method is called as soon as the PythonModel is constructed.

The same PythonModelContext will also be available during calls to predict(), but it may be more efficient to override this method and load artifacts from the context at model load time.

Parameters

context – A PythonModelContext instance containing artifacts that the model can use to perform inference.

Predict labels on provided data

Parameters

linker (splink.spark.linker.SparkLinker) –

Utility functions

Traverse a single comparison item from the splink settings JSON and log the sql condition the comparison was made under and the m and u values

Parameters

comparison (dict) – dictionary element from a Splink settings object

Return type

None

Traverse the comparisons part of the splink settings and extract the learned values. This allows you to easily compare values across different training conditions.

Parameters

splink_model_json (dict) – the settings json from a splink model.

Return type

None

Simple method for extracting parameters from the splink settings and logging as parameters in MLFlow

Parameters

splink_model_json (dict) – the settings json from a splink model.

Return type

None

Log the splink settings json in its entirery under the name “linker.json”

Parameters

splink_model_json (dict) – the settings json from a splink model.

Return type

None

Simple method for logging a splink model

Parameters
  • linker (splink.spark.linker.SparkLinker) – Splink model object

  • model_name (str) – name to save model under

Return type

None

Comprehensive logging of Splink attributes, parameters, charts and model JSON to MLFlow to provide easy reproducability and tracking during model development and deployment. This will create a new run under which to log everything. Additional parameters and metrics can be logged through the params and metrics arguments.

For details on MLFlow please visit https://mlflow.org/.

This will log - All the m and u values trained along with their training condition - All the hyperparameters set during training (blocking rules, link type, sql_dialect etc)

Parameters
  • run – an mlflow.start_run() instance

  • linker (splink.spark.linker.SparkLinker) – a trained Splink linkage model

  • model_name (str) – str, the name to log the model under

  • log_parameters_charts (bool) – boolean, whether to log parameter charts or not. Default to True as this is a quick operation.

  • log_profiling_charts (bool) – boolean, whether to log data profiling charts or not. Default to False as this requires running Spark jobs and can take time.

  • params (Optional[dict]) – dict[str: str]. Dictionary is of param_name :param_value. Optional argument for logging arbitrary parameters

  • metrics (Optional[dict]) – dict[str: double or int]. Dictionary is of metric_name :metric_value. Optional argument for logging arbitrary metrics

  • artifacts (Optional[dict]) – dict[str: str]. Dictionary is of artifact_name :artifact_filepath. Optional argument for logging arbitrary artifacts (NB - artifacts must be written to disk before logging).

Return type

None

Log all the non-data related charts to MLFlow

Parameters
  • linker (splink.spark.linker.SparkLinker) – a Splink linker object

  • log_parameters_charts (bool) – boolean, whether to log parameter charts or not

  • log_profiling_charts (bool) – boolean, whether to log data profiling charts or not

Return type

None

Save a chart to MLFlow. This writes the chart out temporarily for MLFlow to pick up and save.

Parameters
  • chart_name (str) – the name the chart will be given in MLFlow

  • chart – chart object from Splink

Returns a Splink model JSON from a model tracked with MLFlow. This can be retrieved either from a run or from the MLFlow Model Registry, if and only if the model was logged to MLFlow using the method splink.mlflow.log_splink_model_to_mlflow().

Extracting the linker’s JSON specification requires temporarily writing to /tmp on the local file system. All written files are deleted before the function returns.

Returns a splink setting json as a python dict.

Parameters

artifact_uri (str) – URI pointing to the artifacts, such as “runs:/500cf58bee2b40a4a82861cc31a617b1/my_model.pkl”, “models:/my_model/Production”, or “s3://my_bucket/my/file.txt”.

Return type

dict

Indices and tables

Project Support

Please note that all projects in the industry-solutions github space are provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of these projects.

Any issues discovered through the use of this project should be filed as GitHub Issues on the Repo. They will be reviewed as time permits, but there are no formal SLAs for support.