Creating new aggregations and features
Creating New Aggregations and Features¶
Users can create new DataElement/Feature aggregations and features in notebook for analytical purpose. Aggregation definitions created here can be saved in .py files. These files can be uploaded to register new DE/Features on platform under DataElement/Feature registry.
- Creating New Aggregations
- Creating New Features
Creating New Aggregations¶
# Import required package
from corridor import DataTable
# read in the Application Level Dataset
loan_table = DataTable('loan_table').to_spark()
# subset loan performance table to keep only the records for the loan_id in loan_table
loan_perf_table = DataTable('performance_table').to_spark()
loan_perf_table = loan_table.join(loan_perf_table, on='corridor_loan_id')
loan_perf_table = loan_perf_table[['corridor_loan_id', 'DUE_AMT', 'FEE_PAID', 'COAMT']]
loan_perf_table.limit(2).toPandas()
| corridor_loan_id | DUE_AMT | FEE_PAID | COAMT | |
|---|---|---|---|---|
| 0 | 48043066575 | 388.86 | 0.0 | 0.0 |
| 1 | 48043066575 | 388.86 | 0.0 | 0.0 |
Using loan_perf_table to generate three DataElement aggregates. We will take average of certain fields over performance period. In this illustration, we create a pandas version of aggregated DataElement leveraging a registered globalfunction : calculate_mean. Note that, aggregates can be created even without using registered globalfunction.
1. due_amt: Average of DUE_AMT over performance period
2. fee_paid: Average of FEE_PAID over performance period
3. chargeoff_amt: Average of ChargeOFF_AMOUNT over performance period
# Instantiate a GlobalFunction
from corridor import GlobalFunction
calculate_mean = GlobalFunction('calculate_mean')
# Extract python function from calculate_mean
calculate_mean_python_fn = calculate_mean.get_python_function()
print(calculate_mean.definition)
'''
Returns average of specific column of given dataframe
'''
if df[colname] is None:
return None
else:
return df[colname].mean(skipna=True)
# import necessary function
from pyspark.sql.functions import pandas_udf,PandasUDFType
import pyspark.sql.functions as F
from pyspark.sql.types import *
aggregate_key = 'corridor_loan_id'
cols_to_agg = ['DUE_AMT', 'FEE_PAID', 'COAMT']
result_schema = loan_perf_table.schema
@pandas_udf(result_schema, functionType=PandasUDFType.GROUPED_MAP)
def create_group_mean(df):
import pandas as pd
mean_result = []
for col in cols_to_agg:
mean_result += [calculate_mean_python_fn(df, col)]
key = df[aggregate_key].iloc[0]
return pd.DataFrame([[key] + mean_result])
loan_perf_table_agg = loan_perf_table.groupby(aggregate_key).apply(create_group_mean)
loan_perf_table_agg.show(5)
+----------------+-----------------+--------+-----------------+ |corridor_loan_id| DUE_AMT|FEE_PAID| COAMT| +----------------+-----------------+--------+-----------------+ | 46123043808|269.6757142857142| 0.0| 0.0| | 45064090452| 582.17| 0.0| 0.0| | 45165073592| 655.71| 0.0| 0.0| | 45057051387|348.8699999999999| 0.0| 0.0| | 44038065001|773.8063888888889| 0.0|82.54697582222222| +----------------+-----------------+--------+-----------------+ only showing top 5 rows
# Summary of Aggregated DataElements
loan_perf_table_agg.drop('corridor_loan_id').describe().toPandas()
| summary | DUE_AMT | FEE_PAID | COAMT | |
|---|---|---|---|---|
| 0 | count | 83 | 83 | 83 |
| 1 | mean | 454.22920202458596 | 0.17690203120172998 | 17.52316458389905 |
| 2 | stddev | 251.82541903885442 | 1.104910021444327 | 77.88027802307363 |
| 3 | min | 0.0 | 0.0 | 0.0 |
| 4 | max | 1204.42 | 9.733333333333333 | 504.83557453333333 |
Creating New Features¶
Just like aggregations, users can create new features in notebook. Feature definitions created here can be saved in .py files. These files can be uploaded to register new features on platform under feature registry.
# Import necessary packages
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
Reading application data
# Import required package
from corridor import DataTable
# read in the Application Level Dataset
application_table = DataTable('application').to_spark()
# keep first 1000 records
application_table = application_table.limit(1000)
Feature: Flag to identify good customer
# Defining feature logic
def good_customer(annual_income, fico):
if annual_income > 50000:
if fico > 720:
return 'yes'
else:
return 'maybe'
else:
return 'no'
# Converting python function to spark udf
udf_good_customer = F.udf(good_customer, StringType())
# Creating feature using variables present in df
application_table_with_features = application_table.withColumn(
"good_customer", udf_good_customer("annual_inc","fico_range_high"))
# Frequency table of generated feature
application_table_with_features.groupby("good_customer").count().show()
+-------------+-----+ |good_customer|count| +-------------+-----+ | no| 293| | yes| 266| | maybe| 441| +-------------+-----+