Build machine learning-ready datasets from the Amazon SageMaker offline Feature Store using the Amazon SageMaker Python SDK

3 months ago 3

Amazon SageMaker Feature Store is simply a purpose-built work to store and retrieve diagnostic information for usage by instrumentality learning (ML) models. Feature Store provides an online store susceptible of low-latency, high-throughput reads and writes, and an offline store that provides bulk entree to each humanities grounds data. Feature Store handles the synchronization of information betwixt the online and offline stores.

Because exemplary improvement is an iterative process, customers volition often query the offline store and physique assorted datasets for exemplary training. Currently, determination are respective ways to entree features successful the offline store, including moving SQL queries with Amazon Athena oregon utilizing Spark SQL successful Apache Spark. However, these patterns necessitate penning advertisement hoc (and sometimes complex) SQL statements, which isn’t ever suitable for the information idiosyncratic persona.

Feature Store recently extended the SageMaker Python SDK to marque it easier to make datasets from the offline store. With this release, you tin usage a caller acceptable of methods successful the SDK to make datasets without penning SQL queries. These caller methods enactment communal operations specified arsenic clip travel, filtering duplicate records, and joining aggregate diagnostic groups portion ensuring point-in-time accuracy.

In this post, we show however to usage the SageMaker Python SDK to physique ML-ready datasets without penning immoderate SQL statements.

Solution overview

To show the caller functionality, we enactment with 2 datasets: leads and web selling metrics. These datasets tin beryllium utilized to physique a exemplary that predicts if a pb volition person into a merchantability fixed selling activities and metrics captured for that lead.

The leads information contains accusation connected prospective customers who are identified utilizing Lead_ProspectID. The features for a pb (for example, LeadSource) tin beryllium updated implicit time, which results successful a caller grounds for that lead. The Lead_EventTime represents the clip successful which each grounds is created. The pursuing screenshot shows an illustration of this data.

The web selling metrics information tracks the engagement metrics for a lead, wherever each pb is identified utilizing the Web_ProspectID. The Web_EventTime represents the clip successful which the grounds was created. Unlike the leads diagnostic group, determination is lone 1 grounds per pb successful this diagnostic group. The pursuing screenshot shows an illustration of this data.

We locomotion done the cardinal parts of the sagemaker-feature-store-offline-sdk.ipynb notebook, which demonstrates the pursuing steps:

  1. Create a dataset from a diagnostic group.
  2. Join aggregate diagnostic groups.
  3. Create a point-in-time articulation betwixt a diagnostic radical and a dataset based connected a acceptable of events astatine circumstantial timestamps.
  4. Retrieve diagnostic past wrong a circumstantial clip range.
  5. Retrieve features arsenic of a circumstantial timestamp.


You request the pursuing prerequisites:

git clone

We presume a diagnostic radical for the leads information has been created utilizing the existing FeatureGroup.create method, and tin beryllium referenced utilizing the adaptable base_fg. For much accusation connected diagnostic groups, notation to Create Feature Groups.

Create a dataset from a diagnostic group

To make a dataset utilizing the SageMaker SDK, we usage the caller FeatureStore class, which contains the create_dataset method. This method accepts a basal diagnostic radical that whitethorn beryllium joined with different diagnostic groups oregon DataFrames. We commencement by providing the leads diagnostic radical arsenic the basal and an Amazon Simple Storage Service (Amazon S3) way to store the dataset:

from sagemaker.feature_store.feature_store import FeatureStore feature_store = FeatureStore(sagemaker_session=feature_store_session) ds1_builder = feature_store.create_dataset (base=base_fg, output_path=f"s3://{s3_bucket_name}/dataset_query_results",)

The create_dataset method returns a DatasetBuilder object, which tin beryllium utilized to make a dataset from 1 oregon aggregate diagnostic groups (which we show successful the adjacent section). To make a elemental dataset consisting of lone the leads features, we invoke the to_csv_file method. This runs a query successful Athena to retrieve the features from the offline store, and saves the results to the specified S3 path.

csv, query = ds1_builder.to_csv_file() # Show S3 determination of CSV file print(f'CSV file: {csv}')

Join aggregate diagnostic groups

With the SageMaker SDK, you tin easy articulation aggregate diagnostic groups to physique a dataset. You tin besides execute articulation operations betwixt an existing Pandas DataFrame to a azygous oregon aggregate diagnostic groups. The base diagnostic group is an important conception for joins. The basal diagnostic radical is the diagnostic radical that has different diagnostic groups oregon the Pandas DataFrame joined to it.

While creating the dataset utilizing the create_dataset function, we usage the with_feature_group method, which performs an interior articulation betwixt the basal diagnostic radical and different diagnostic radical utilizing the grounds identifier and the people diagnostic sanction successful the basal diagnostic group. In our example, the basal diagnostic radical is the leads diagnostic group, and the people diagnostic radical is the web selling diagnostic group. The with_feature_group method accepts the pursuing arguments:

  • feature_group – This is the diagnostic radical we are joining with. In our codification sample, the people diagnostic radical is created by utilizing the web selling dataset.
  • target_feature_name_in_base – The sanction of the diagnostic successful the basal diagnostic radical that we’re utilizing arsenic a cardinal successful the join. We usage Lead_ProspectID arsenic the grounds identifier for the basal diagnostic group.
  • included_feature_names – This is the database of the diagnostic names of the basal diagnostic group. We usage this tract to specify the features that we privation to see successful the dataset.

The pursuing codification shows an illustration of creating a dataset by joining the basal diagnostic radical with the people diagnostic group:

join_builder = feature_store.create_dataset(base=base_fg, output_path=f"s3://{s3_bucket_name}/dataset_query_results").with_feature_group( feature_group=target_fg, target_feature_name_in_base="Lead_ProspectID", included_feature_names=["Web_ProspectID", "LastCampaignActivity","PageViewsPerVisit", "TotalTimeOnWebsite","TotalWebVisits", "AttendedMarketingEvent","OrganicSearch", "ViewedAdvertisement",],)

You tin widen the articulation operations to see aggregate diagnostic groups by adding the with_feature_group method astatine the extremity of the preceding codification illustration and defining the required arguments for the caller diagnostic group. You tin besides execute articulation operations with an existing DataFrame by defining the basal to beryllium your existing Pandas DataFrame and joining with the funny diagnostic groups. The pursuing codification illustration shows however to make dataset with an existing Pandas DataFrame and an existing diagnostic group:

ds2_builder = feature_store.create_dataset( base=new_records_df2, # Pandas DataFrame event_time_identifier_feature_name="Lead_EventTime", record_identifier_feature_name="Lead_ProspectID", output_path=f"s3://{s3_bucket_name}/dataset_query_results",).with_feature_group( base_fg, "Lead_ProspectID", ["LeadSource"])

For much examples connected these assorted configurations, notation to Create a Dataset from your Feature Groups.

Create a point-in-time join

One of the astir almighty capabilities of this enhancement is to execute point-in-time joins simply and without the request to constitute analyzable SQL code. When gathering ML models, information scientists request to debar data leakage oregon people leakage, which is accidentally utilizing information during exemplary grooming that wouldn’t beryllium disposable astatine the clip of prediction. For instance, if we’re trying to foretell recognition paper fraud, we should exclude transactions that get aft the fraudulent complaint we’re trying to predict, different the trained exemplary could usage this post-fraud accusation to change the model, making it generalize little well.

Retrieval of point-in-time close diagnostic information requires you to proviso an entity DataFrame that provides a acceptable of grounds IDs (or superior key) and corresponding lawsuit times that service arsenic the cutoff clip for the event. This retrieval mechanics is sometimes referred to arsenic row-level clip travel, due to the fact that it allows a antithetic clip constraint to beryllium applied for each enactment key. To execute point-in-time joins with the SageMaker SDK, we usage the Dataset Builder people and supply the entity DataFrame arsenic the basal statement to the constructor.

In the pursuing code, we make a elemental entity DataFrame with 2 records. We acceptable the lawsuit times, utilized to bespeak the cutoff time, adjacent the mediate of the clip bid information (mid-January 2023):

# Create Events (entity table) dataframe to walk Timestamp for Point-in-Time Join events = [['2023-01-20T00:00:00Z', record_id1], ['2023-01-15T00:00:00Z', record_id2]] df_events = pd.DataFrame(events, columns=['Event_Time', 'Lead_ProspectID'])

When we usage the point_in_time_accurate_join functionality with the create_dataset call, the interior query excludes each records with timestamps aboriginal past the cutoff times supplied, returning the latest diagnostic values that would person been disposable astatine the clip of the event:

# Create Dataset Builder utilizing point-in-time-accurate-join function pit_builder = feature_store.create_dataset( base=df_events, event_time_identifier_feature_name='Event_Time', record_identifier_feature_name='Lead_ProspectID', output_path=f"s3://{s3_bucket_name}/{s3_prefix}/dataset_query_results" ).with_feature_group(base_fg, "Lead_ProspectID" ).point_in_time_accurate_join( ).with_number_of_recent_records_by_record_identifier(1)

Notice that determination are lone 2 records successful the DataFrame returned by the point-in-time join. This is due to the fact that we lone submitted 2 grounds IDs successful the entity DataFrame, 1 for each Lead_ProspectID we privation to retrieve. The point-in-time criteria specifies that a record’s lawsuit clip (stored successful the Lead_Eventtime field) indispensable incorporate a worth that is little than the cutoff time.

Additionally, we instruct the query to retrieve lone the latest grounds that meets this criteria due to the fact that we person applied the with_number_of_recent_records_by_record_identifier method. When utilized successful conjunction with the point_in_time_accurate_join method, this allows the caller to specify however galore records to instrumentality from those that conscionable the point-in-time articulation criteria.

Compare point-in-time articulation results with Athena query results

To verify the output returned by the SageMaker SDK point_in_time_accurate_join function, we comparison it to the effect of an Athena query. First, we make a modular Athena query utilizing a SELECT connection tied to the circumstantial array created by the Feature Store runtime. This array sanction tin beryllium recovered by referencing the table_name tract aft instantiating the athena_query from the FeatureGroup API:

SELECT * FROM "sagemaker_featurestore"."off_sdk_fg_lead_1682348629" WHERE "off_sdk_fg_lead_1682348629"."Lead_ProspectID" = '5e84c78f-6438-4d91-aa96-b492f7e91029'

The Athena query doesn’t incorporate immoderate point-in-time articulation semantics, truthful it returns each records that lucifer the specified record_id (Lead_ProspectID).

Next, we usage the Pandas room to benignant the Athena results by lawsuit times for casual comparison. The records with timestamps aboriginal than the lawsuit times specified successful the entity DataFrame (for example, 2023-01-15T00:00:00Z) submitted to the point_in_time_accurate_join don’t amusement up successful the point-in-time results. Because we additionally specified that we lone privation a azygous grounds from the preceding create_dataset code, we lone get the latest grounds anterior to the cutoff time. By comparing the SageMaker SDK results with the Athena query results, we spot that the point-in-time articulation relation returned the due records.

Therefore, we person assurance that we tin usage the SageMaker SDK to execute row-level clip question and debar people leakage. Furthermore, this capableness works crossed aggregate diagnostic groups that whitethorn beryllium refreshed connected wholly antithetic timelines.

Retrieve diagnostic past wrong a circumstantial clip range

We besides privation to show the usage of specifying a clip scope model erstwhile joining the diagnostic groups to signifier a dataset. The clip model is defined utilizing with_event_time_range, which accepts 2 inputs, starting_timestamp and ending_timestamp, and returns a dataset builder object. In our codification sample, we acceptable the retrieval clip model for 1 afloat time from 2022-07-01 00:00:00 until 2022-07-02 00:00:00.

The pursuing codification shows however to make a dataset with the specified lawsuit clip model portion joining the basal diagnostic radical with the people diagnostic group:

# Setup Event Time window: seconds of unix epoch time # Start astatine 07/01/2022 and acceptable clip model to 1 day start_ts = 1656633600 time_window = 86400 # Using hard-coded timestamps from dataset, past adding clip window datetime_start = datetime.fromtimestamp(start_ts) datetime_end = datetime.fromtimestamp(start_ts+time_window) print(f'Setting retrieval clip window: {datetime_start} until {datetime_end}') time_window_builder = (feature_store.create_dataset( base=base_fg, output_path=f"s3://{s3_bucket_name}/dataset_query_results").with_feature_group( feature_group=target_fg, target_feature_name_in_base="Lead_ProspectID", included_feature_names=["Web_ProspectID","LastCampaignActivity","PageViewsPerVisit", "TotalTimeOnWebsite","TotalWebVisits","AttendedMarketingEvent", "OrganicSearch","ViewedAdvertisement",],) .with_event_time_range(starting_timestamp=datetime_start,ending_timestamp=datetime_end))

We besides corroborate the quality betwixt the sizes of the dataset created utilizing with_event_time_range by exporting to a Pandas DataFrame with the to_dataframe() method and displaying the data. Notice however the effect acceptable has lone a fraction of the archetypal 10,020 records, due to the fact that it lone retrieves records whose event_time is wrong the 1-day clip period.

Retrieve features arsenic of a circumstantial timestamp

The DatasetBuilder as_of method retrieves features from a dataset that conscionable a timestamp-based constraint, which the caller provides arsenic an statement to the function. This mechanics is utile for scenarios specified arsenic rerunning experiments connected antecedently collected data, backtesting clip bid models, oregon gathering a dataset from a erstwhile authorities of the offline store for information auditing purposes. This functionality is sometimes referred to arsenic clip question due to the fact that it fundamentally rolls backmost the information store to an earlier day and time. This clip constraint is besides referred to arsenic the cutoff timestamp.

In our illustration code, we archetypal make the cutoff timestamp by speechmaking the write_time worth for the past grounds written to the Feature Store, the 1 written with put_record. Then we supply this cutoff timestamp to the DatasetBuilder arsenic an statement to the as_of method:

# Create dataset utilizing as-of timestamp print(f'using cut-off time: {asof_cutoff_datetime}') as_of_builder = feature_store.create_dataset( base=base_fg, output_path=f"s3://{s3_bucket_name}/{s3_prefix}/dataset_query_results").with_feature_group( feature_group=target_fg, target_feature_name_in_base='Lead_ProspectID', included_feature_names=['Web_ProspectID','Web_EventTime', 'TotalWebVisits']).as_of(asof_cutoff_datetime)

It’s important to enactment that the as_of method applies the clip constraint to the interior write_time field, which is automatically generated by Feature Store. The write_time tract represents the existent timestamp erstwhile the grounds is written to the information store. This is antithetic than different methods similar point-in-time-accurate-join and with_event_time_range that usage the client-provided event_time tract arsenic a comparator.

Clean up

Be definite to delete each the resources created arsenic portion of this illustration to debar incurring ongoing charges. This includes the diagnostic groups and the S3 bucket containing the offline store data.

SageMaker Python SDK acquisition vs. penning SQL

The caller methods successful the SageMaker Python SDK let you to rapidly make datasets and determination to the grooming measurement rapidly during the ML lifecycle. To amusement the clip and effort that tin beryllium saved, let’s analyse a usage lawsuit wherever we request to articulation 2 diagnostic groups portion retrieving the features wrong a specified clip frame. The pursuing fig compares the Python queries connected the offline Feature Store vs. SQL utilized to make the dataset down a Python query.

As you tin see, the aforesaid cognition of joining 2 diagnostic groups requires you to make a long, analyzable SQL query, whereas it tin beryllium accomplished utilizing conscionable the with_feature_group and with_event_time_range methods successful the SageMaker Python SDK.


The caller offline store methods successful the Python SageMaker SDK let you to query your offline features without having to constitute analyzable SQL statements. This provides a seamless acquisition for customers who are accustomed to penning Python codification during exemplary development. For much accusation astir diagnostic groups, notation to Create a Dataset From Your Feature Groups and Feature Store APIs: Feature Group.

The afloat illustration successful this station tin beryllium recovered successful the GitHub repository. Give it a effort and fto america cognize your feedback successful the comments.

About the Authors

Paul Hargis has focused his efforts connected instrumentality learning astatine respective companies, including AWS, Amazon, and Hortonworks. He enjoys gathering exertion solutions and teaching radical however to leverage them. Paul likes to assistance customers grow their instrumentality learning initiatives to lick real-world problems. Prior to his relation astatine AWS, helium was pb designer for Amazon Exports and Expansions, helping amended the acquisition for planetary shoppers.

Mecit Gungor is an AI/ML Specialist Solution Architect astatine AWS helping customers plan and physique AI/ML solutions astatine scale. He covers a wide scope of AI/ML usage cases for Telecommunication customers and presently focuses connected Generative AI, LLMs, and grooming and inference optimization. He tin often beryllium recovered hiking successful the wilderness oregon playing committee games with his friends successful his escaped time.

Tony Chen is a Machine Learning Solutions Architect astatine AWS, helping customers plan scalable and robust instrumentality learning capabilities successful the cloud. As a erstwhile information idiosyncratic and information engineer, helium leverages his acquisition to assistance tackle immoderate of the astir challenging problems organizations look with operationalizing instrumentality learning.

Sovik Kumar Nath is an AI/ML solution designer with AWS. He has extended acquisition successful end-to-end designs and solutions for instrumentality learning; concern analytics wrong financial, operational, and selling analytics; healthcare; proviso chain; and IoT. Outside work, Sovik enjoys traveling and watching movies.

Read Entire Article