Welcome to ARC’s documentation!
Documentation¶
AutoLinker¶
- class arc.autolinker.AutoLinker(spark=None, catalog=None, schema=None, experiment_name=None, training_columns=None, deterministic_columns=None)¶
Class to create object arc for automated data linking.
- Parameters
spark (str) – Spark instance used
catalog (str) – Unity Catalog name, if used, for intermediate tables
schema (str) – Schema/database name for intermediate tables
experiment_name (str) – Location or simply name of the MLflow experiment for storing trial run results
training_columns (list) – Valid list of columns (or column names separated by ‘&’ for AND combinations) of columns to be fixed during EM training
deterministic_columns (list) – Valid list of columns (or column names separated by ‘&’ for AND combinations) of columns to be used for prior estimation
- Return type
None
Basic usage:
>>> arc = AutoLinker( ... catalog="splink_catalog", # catalog name ... schema="splink_schema", # schema to write results to ... experiment_name="autosplink_experiment" # MLflow experiment location ... ) >>> arc.auto_link( ... data=data, # dataset to dedupe ... attribute_columns=["A", "B", "C", "D"], # columns that contain attributes to compare ... unique_id="id", # column name of the unique ID ... comparison_size_limit=500000, # Maximum number of pairs when blocking applied ... max_evals=100 # Maximum number of hyperopt trials to run ... ) >>> arc.best_linker.m_u_parameters_chart() # use Splink functionality out-of-the-box
- _calculate_dataset_entropy(data, attribute_columns, by_cluster=False, base=0)¶
Method to calculate the average entropy of each attribute column in a dataset. Uses the custom arc_entropy_agg function from ARC’s SQL built-ins to efficiently calculate entropy across all columns.
Returns a dictionary with keys as columns and values as entropy in the columns.
- Parameters
data (pyspark.sql.dataframe.DataFrame) – input dataframe with row per record
column – (valid) name of column to calculate entropy on
by_cluster (bool) – if True, it will calculate the average entropy when the dataset is split by clusters.
base (int) – base of the log in the entropy calculation
attribute_columns (list) –
- Return type
dict
- _calculate_unsupervised_metrics(clusters, attribute_columns, base)¶
Method to calculate the chosen metric for unsupervised optimisation, the power ratio of information gains when calculated using a base of the number of clusters and a base of the maximum number of unique values in any columns in the original data. Information gain is defined as the difference in average entropy of clustered records when split and the entropy of the data points that are being matched.
Let the number of clusters in the matched subset of the data be \(c\)
Let the maximum number of unique values in any column in the original dataset be \(u\)
Then the “scaled” entropy of column \(k\), \(N\) unique values with probability \(P\) is
\(E_{s,k} = -\Sigma_{i}^{N} P_{i} \log_{c}(P_{i})\)
Then the “adjusted” entropy of column \(k\), \(N\) unique values with probability \(P\) is
\(E_{a,k} = -\Sigma_{i}^{N} P_{i} \log_{u}(P_{i})\)
The scaled information gain is
\(I_{s} = \Sigma_{k}^{K} E_{s,k} - E'_{s,k}\)
and the adjusted information gain is
\(I_{a} = \Sigma_{k}^{K} E_{a,k} - E'_{a,k}\)
where \(E'\) is the mean entropy of the individual clusters predicted.
The metric to optimise for is:
\(I_{s}^{I_{a}}\)
Returns information gain metric, as dictionary.
- Parameters
clusters (pyspark.sql.dataframe.DataFrame) – dataframe returned from linker clustering
attribute_columns (list) – list of valid column names containing attributes in the clusters dataset
base (int) – the base for the log when calculating the adjusted entropy
- Return type
dict
- _convert_hyperopt_to_splink()¶
Method to convert hyperopt trials to a dictionary that can be used to train a linker model. Used for training the best linker model at the end of an experiment. Sets class attributes for best metric and parameters as well. Returns a dictionary.
- Return type
dict
- _create_attribute_columns(autolink_data, unique_id, true_label)¶
Called only when an autolink process is initiated, this function will calculate which attribute columns to use and do the necessary remapping work. Returns ——-
- _create_comparison_list(space)¶
Method to convert comparisons dictionary generated by hyperopt to a Splink-compatible list of spark_comparison_library functions with the generated columns and thresholds.
Returns list of spark_comparison_library method instances.
- Parameters
space (dict) – nested dicitonary generated by create_comparison_dict
- Return type
list
- _create_hyperopt_space(data, attribute_columns, comparison_size_limit, sample_for_blocking_rules, max_columns_per_and_rule=2, max_rules_per_or_rule=3)¶
Method to create hyperopt space for comparison and blocking rule hyperparameters from list of columns. Takes a given (or generated) list of columns and produces a dictionary that can be converted to a comparison list later on, with function names and threhsolds, and a list (of lists) of blocking rules to test.
Returns a dictionary for hyperopt parameter search
- Parameters
data (pyspark.sql.dataframe.DataFrame) – input data with records per row
attribute_columns (list) – valid column names of data, containing all possible columns to compare
comparison_size_limit (int) – maximum number of pairs we want to compare, to limit hardware issues
max_columns_per_and_rule (int) – the maximum number of column comparisons in a single rule to try
max_rules_per_or_rule (int) – the maximum number of rules comparisons in a composite rule to try
- Return type
dict
- _do_clustering()¶
- _drop_intermediate_tables()¶
Method to drop intermediate tables for clean consecutive runs.
- _evaluate_data_input_arg(data)¶
- Parameters
data (Union[pyspark.sql.dataframe.DataFrame, list]) –
- _generate_candidate_blocking_rules(data, attribute_columns, comparison_size_limit, sample_for_blocking_rules, max_columns_per_and_rule=2, max_rules_per_or_rule=3)¶
Method to automatically generate a list of lists of blocking rules to test, given a user-defined limit for pair-wise comparison size. Uses appriximated method built in to ARC’s SQL functions.
Returns a nested list of lists with Splink-compatible blocking rule queries.
- Parameters
data (pyspark.sql.dataframe.DataFrame) – input data with record-per-row
attribute_columns (list) – valid column names of data, containing all possible columns to block on
comparison_size_limit (int) – the maximum number of pairs we want to compare, to limit hardware issues
max_columns_per_and_rule (int) – the maximum number of column comparisons in a single rule to try
max_rules_per_or_rule (int) – the maximum number of rules comparisons in a composite rule to try
- Return type
list
- _generate_rules(columns)¶
Method to create a list of Splink-compatible SQL statements from a list of candidate columns for rules. Can be used for simple blocking rules (no AND statement), deterministic rules or for training rules.
Returns list of strings of Splink-compatible SQL statements.
- Parameters
columns (list) – valid attribute columns in the data set
- Return type
list
- _get_catalog(spark)¶
- _get_rowcounts(linker_mode, autolink_data)¶
- _get_schema(spark)¶
- _get_spark(autolink_data)¶
- Parameters
autolink_data (Union[pyspark.sql.dataframe.DataFrame, list]) –
- _randomise_columns(attribute_columns)¶
Method to randomly select (combinations of) columns from attribute columns for EM training. Will try to pick 2 combinations of 2 (i.e AB and BC from ABC), but will default to 2 if there are only 2.
Returns list of lists of strings.
- Parameters
attribute_columns (list) – list of strings containing all attribute columns
- Return type
list
- _set_mlflow_experiment_name(spark)¶
- _set_unique_id(autolink_data)¶
- auto_link(data, attribute_columns=None, unique_id=None, comparison_size_limit=100000, max_evals=5, cleaning='all', threshold=0.9, true_label=None, random_seed=42, metric='information_gain_power_ratio', sample_for_blocking_rules=True)¶
Method to run a series of hyperopt trials.
- Parameters
data (Union[pyspark.sql.dataframe.DataFrame, list]) – Either a Spark DataFrame containing the data to be de-duplicated, or a list of 2 Spark Dataframes to be linked.
space – dictionary generated by hyperopt sampling
attribute_columns (Optional[list]) – list of strings containing all attribute columns
unique_id (Optional[str]) – string with the name of the unique ID column
comparison_size_limit (int) – int denoting maximum size of pairs allowed after blocking
max_evals (int) – int denoting max number of hyperopt trials to run
cleaning – string (“all” or “none”) or dictionary with keys as column names and values as list of strings for methods (accepted are “lower” and “alphanumeric_only”)
threshold (float) – float indicating the probability threshold above which a pair is considered a match≠
true_label (Optional[str]) – The name of the column with true record ids, if exists (default None) - if not None, auto_link will attempt to calculate empirical scores
random_seed (int) – Seed for Hyperopt fmin
sample_for_blocking_rules – boolean, default True. down sample the data to 10,000 records for blocking rule generation.
metric (str) –
- Return type
None
- best_clusters_at_threshold(threshold=0.8)¶
Convert the pairwise predictions to clusters using the connected components algorithm.
Parameters :param threshold: default value=0.8.An optional parameter controlling the threshold at which records will be connected. Set it higher to produce clusters with greater precision, lower to produce clusters with greater recall.
Returns a spark dataframe of the clustered input data with a new column cluster_id prepended.
- Parameters
threshold (float) –
- Return type
pyspark.sql.dataframe.DataFrame
- best_predictions()¶
Get the predictions from the best linker model trained. Returns a spark dataframe of the pairwise predictions made by the autolinker model.
- cluster_viewer()¶
Produce an interactive dashboard for visualising clusters. It provides examples of clusters of different sizes. The shape and size of clusters can be indicative of problems with record linkage, so it provides a tool to help you find potential false positive and negative links. See this video for an indepth explanation of interpreting this dashboard: https://www.youtube.com/watch?v=DNvCMqjipis
Writes a HTML file to DBFS at “/dbfs/Users/{username}/scv.html”
- comparison_viewer()¶
Produce an interactive dashboard for querying comparison details. See this video for an indepth explanation of interpreting this dashboard: https://www.youtube.com/watch?v=DNvCMqjipis
Writes a HTML file to DBFS at “/dbfs/Users/{username}/scv.html”
Returns None.
- Return type
None
- estimate_clustering_columns(data, unique_id, true_label)¶
Use all values as attributes for deduping. This method is in place in case we want to do something different in future. Parameters ———- data :
- Parameters
data (pyspark.sql.dataframe.DataFrame) –
unique_id (str) –
true_label (str) –
- estimate_linking_columns(data, unique_id, true_label)¶
This function estimates the attribute columns for linking 2 datasets. It does this by joining each dataset on each column to every other column, and choosing the pairing with the highest count.
data : 2 spark dataframes which will be linked
Returns - a mapping of the 2 tables to a standard schema. ——-
- Parameters
data (list) –
unique_id (str) –
true_label (str) –
- evaluate_linker(data, predictions, threshold, attribute_columns, unique_id, linker, true_label=None)¶
Method to evaluate predictions made by a trained linker model.
Returns a dictionary of evaluation metrics.
- Parameters
data (Union[pyspark.sql.dataframe.DataFrame, list]) – Spark Dataframe containing the original dataset (required to establish ground truth labels)
predictions (splink.splink_dataframe.SplinkDataFrame) – Splink DataFrame as a result of calling linker.predict()
threshold (float) – float indicating the probability threshold above which a pair is considered a match
attribute_columns (list) – list of strings with valid column names to compare the entropies of
unique_id (str) – string with the name of the unique ID column
linker (splink.spark.linker.SparkLinker) – isntance of splink.SparkLinker, used for deduplication (clustering)
true_label (Optional[str]) – The name of the column with true record ids, if exists (default None) - if not None, auto_link will attempt to calculate empirical scores
- Return type
dict
- get_PR_count(scores_df, unique_id)¶
- get_RR_count(data_df, unique_id, true_label)¶
- get_best_splink_model()¶
Get the underlying Splink model from the MLFlow Model Registry. Useful for advanced users wishing to access Splink’s internal functionality not exposed as part of arc Autolinker. Returns ——- A Splink model object.
- get_clustering_metrics(clusters, true_label)¶
- get_confusion_metrics(data_df, predictions_df, thld, unique_id, true_label)¶
- get_scores_df(data_df, predictions_df, unique_id, true_label)¶
- match_weights_chart()¶
Get the match_weights_chart
- Return type
None
- train_and_evaluate_linker(data, space, attribute_columns, unique_id, deterministic_columns=None, training_columns=None, threshold=0.9, true_label=None)¶
Method to train and evaluate a linker model in one go
Returns a tuple of trained linker, predictions, metrics, parameters and mlflow run_id
- Parameters
data (Union[pyspark.sql.dataframe.DataFrame, list]) – Spark DataFrame containing the data to be de-duplicated
space (dict) – dictionary generated by hyperopt sampling
attribute_columns (list) – list of strings containing all attribute columns
unique_id (str) – string with the name of the unique ID column
columns (deterministic) – list of strings containint columns to block on - if None, they will be generated automatically/randomly
training_columns (Optional[list]) – list of strings containing training columns - if None, they will be generated automatically/randomly
threshold (float) – float indicating the probability threshold above which a pair is considered a match
true_label (Optional[str]) – The name of the column with true record ids, if exists (default None) - if not None, auto_link will attempt to calculate empirical scores
deterministic_columns (Optional[list]) –
- Return type
Tuple[splink.spark.linker.SparkLinker, splink.splink_dataframe.SplinkDataFrame, dict, dict, str]
- train_linker(data, space, attribute_columns, unique_id, training_columns=None)¶
Method to train a linker model.
Returns a 2-tuple of trained linker object and Splink dataframe with predictions
- Parameters
data (Union[pyspark.sql.dataframe.DataFrame, list]) – Spark DataFrame containing the data to be de-duplicated
space (dict) – dictionary generated by hyperopt sampling
attribute_columns (list) – list of strings containing all attribute columns
unique_id (str) – string with the name of the unique ID column
training_columns (Optional[list]) – list of strings containing training columns
- Return type
Tuple[splink.spark.linker.SparkLinker, splink.splink_dataframe.SplinkDataFrame]
MLflow utilities¶
Wrapper classes¶
- class arc.autolinker.splink_mlflow.splinkSparkMLFlowWrapper¶
Wrapper class necessary for storing and retrieving the splink model JSON from the MLFlow model repository.
- load_context(context)¶
Loads linker settings json into context
- Parameters
context (mlflow.pyfunc.model.PythonModelContext) – Python Model context instance
- Return type
None
- class arc.autolinker.splink_mlflow.SplinkMLFlowWrapper(**kwargs)¶
MLFlow compatibility wrapper for splink linker.
- clear_context()¶
- get_linker()¶
- get_settings()¶
- get_settings_object()¶
- load_context(context)¶
Loads artifacts from the specified
PythonModelContext
that can be used bypredict()
when evaluating inputs. When loading an MLflow model withload_model()
, this method is called as soon as thePythonModel
is constructed.The same
PythonModelContext
will also be available during calls topredict()
, but it may be more efficient to override this method and load artifacts from the context at model load time.- Parameters
context – A
PythonModelContext
instance containing artifacts that the model can use to perform inference.
- predict(context, X)¶
Predict labels on provided data
- set_settings(settings)¶
- set_should_evaluate(flag)¶
- set_spark_linker(linker)¶
- Parameters
linker (splink.spark.linker.SparkLinker) –
- spark_linker(data)¶
Utility functions¶
- arc.autolinker.splink_mlflow._log_comparison_details(comparison)¶
Traverse a single comparison item from the splink settings JSON and log the sql condition the comparison was made under and the m and u values
- Parameters
comparison (dict) – dictionary element from a Splink settings object
- Return type
None
- arc.autolinker.splink_mlflow._log_comparisons(splink_model_json)¶
Traverse the comparisons part of the splink settings and extract the learned values. This allows you to easily compare values across different training conditions.
- Parameters
splink_model_json (dict) – the settings json from a splink model.
- Return type
None
- arc.autolinker.splink_mlflow._log_hyperparameters(splink_model_json)¶
Simple method for extracting parameters from the splink settings and logging as parameters in MLFlow
- Parameters
splink_model_json (dict) – the settings json from a splink model.
- Return type
None
- arc.autolinker.splink_mlflow._log_splink_model_json(splink_model_json)¶
Log the splink settings json in its entirery under the name “linker.json”
- Parameters
splink_model_json (dict) – the settings json from a splink model.
- Return type
None
- arc.autolinker.splink_mlflow._save_splink_model_to_mlflow(linker, model_name)¶
Simple method for logging a splink model
- Parameters
linker (splink.spark.linker.SparkLinker) – Splink model object
model_name (str) – name to save model under
- Return type
None
- arc.autolinker.splink_mlflow.log_splink_model_to_mlflow(linker, model_name, log_parameters_charts=True, log_profiling_charts=False, params=None, metrics=None, artifacts=None)¶
Comprehensive logging of Splink attributes, parameters, charts and model JSON to MLFlow to provide easy reproducability and tracking during model development and deployment. This will create a new run under which to log everything. Additional parameters and metrics can be logged through the params and metrics arguments.
For details on MLFlow please visit https://mlflow.org/.
This will log - All the m and u values trained along with their training condition - All the hyperparameters set during training (blocking rules, link type, sql_dialect etc)
- Parameters
run – an mlflow.start_run() instance
linker (splink.spark.linker.SparkLinker) – a trained Splink linkage model
model_name (str) – str, the name to log the model under
log_parameters_charts (bool) – boolean, whether to log parameter charts or not. Default to True as this is a quick operation.
log_profiling_charts (bool) – boolean, whether to log data profiling charts or not. Default to False as this requires running Spark jobs and can take time.
params (Optional[dict]) – dict[str: str]. Dictionary is of param_name :param_value. Optional argument for logging arbitrary parameters
metrics (Optional[dict]) – dict[str: double or int]. Dictionary is of metric_name :metric_value. Optional argument for logging arbitrary metrics
artifacts (Optional[dict]) – dict[str: str]. Dictionary is of artifact_name :artifact_filepath. Optional argument for logging arbitrary artifacts (NB - artifacts must be written to disk before logging).
- Return type
None
- arc.autolinker.splink_mlflow._log_linker_charts(linker, log_parameters_charts, log_profiling_charts)¶
Log all the non-data related charts to MLFlow
- Parameters
linker (splink.spark.linker.SparkLinker) – a Splink linker object
log_parameters_charts (bool) – boolean, whether to log parameter charts or not
log_profiling_charts (bool) – boolean, whether to log data profiling charts or not
- Return type
None
- arc.autolinker.splink_mlflow._log_chart(chart_name, chart)¶
Save a chart to MLFlow. This writes the chart out temporarily for MLFlow to pick up and save.
- Parameters
chart_name (str) – the name the chart will be given in MLFlow
chart – chart object from Splink
- arc.autolinker.splink_mlflow.get_model_json(artifact_uri)¶
Returns a Splink model JSON from a model tracked with MLFlow. This can be retrieved either from a run or from the MLFlow Model Registry, if and only if the model was logged to MLFlow using the method splink.mlflow.log_splink_model_to_mlflow().
Extracting the linker’s JSON specification requires temporarily writing to /tmp on the local file system. All written files are deleted before the function returns.
Returns a splink setting json as a python dict.
- Parameters
artifact_uri (str) – URI pointing to the artifacts, such as “runs:/500cf58bee2b40a4a82861cc31a617b1/my_model.pkl”, “models:/my_model/Production”, or “s3://my_bucket/my/file.txt”.
- Return type
dict
Indices and tables¶
Project Support¶
Please note that all projects in the industry-solutions
github space are provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of these projects.
Any issues discovered through the use of this project should be filed as GitHub Issues on the Repo. They will be reviewed as time permits, but there are no formal SLAs for support.