SPSS files (.sav) and Parquet data can be ingested to create datasets in the Lakehouse. Parquet based data must be accompanied by schema and metadata as described in the Crunch Logical Schema.
New data can be ingested via file uploads (using AWS Accelerated Upload APIs) or directly from an AWS S3 Bucket.
All ingestion happens through a DataSource. The steps include:
- Create a DataSource.
- Use the DataSource APIs to ingest via file uploads or direct from an S3 bucket.
A DataSource represents a single dataset.
A DataSource supports initial ingestion and subsequent updates to the data (see the next section).
Why do file uploads use AWS Accelerated Upload APIs?
The AWS Accelerated Upload API provides faster uploads through regional data centers, improving upload performance. Your data speedily uploads to a local AWS data center, and then internally transfers to the Crunch Lakehouse Data Platform maximizing the performance of the AWS global backbone network.
API references
- DataSource API Documentation
- DataSource File Upload API Documentation
- DataSource Ingestion Direct from S3
Updates
After creating a dataset through the DataSource API, you will likely need to update it. The update API provides the capability to add non-breaking schema and/or metadata changes to a dataset.
This includes the ability to add variables to datasets, as well as add categories to variables and row level data.
Currently, the Update API only supports updates direct from S3.
Notifications
Notifications are supported for update operations. Any time there is an update to a dataset, you may receive a notification for it.
These are important when the upstream and downstream systems of a data pipeline are decoupled/managed by different teams.
An upstream source of data uses the ingestion and update APIs to load new data in to the Lakehouse. Downstream systems may subscribe to notifications about updates via the DataDestination.
A typical use case for update notifications is to trigger an export of new data.
Currently, notifications are delivered through an SQS queue provided by you.
Here's what a notification payload looks like:
{
"event": {"action": "update"},
"entities": {"datasources": {"lake_id": <dataset ID>}},
"body": {"version": <dataset version>}
}
Python code examples
The following code examples rely on the public Crunch client pycrunch package.
Installation
pip install pycrunch
Ingest a Parquet file directly from S3
The following code demonstrates how to ingest a Parquet based dataset into Crunch Lakehouse. This process involves two main steps:
- Create a datasource — Only the name is required and it can be used for identification.
-
Ingest a Parquet dataset from a given s3 path — This step allows for additional configuration options, including:
- row_id_type: Specifies the expected type of row IDs for validation during ingestion.
- partition_by: Defines how the dataset will be partitioned in the lake to optimize query performance. By default, datasets are partitioned by var_name. Additional partitioning, for example by month in the sample_date column can be specified, as shown in the example below.
Additionally, the code waits for the ingestion process to complete and prints any errors that may occur during the process.
import pycrunch
from pycrunch.shoji import as_entity, wait_progress, as_value, TaskError
from pycrunch.progress import DefaultProgressTracking
site = pycrunch.connect(
api_key="<crunch_api_key>",
site_url="<crunch_api_url>",
)
# the first step is to create a datasource,
# as it represents a dataset in the lake.
datasource = site.datasources.create(
as_entity(dict(name="<name>"))
)
# print datasource url
print(datasource.self)
# get datasource entity
entity = site.follow("datasources", {"name": "<name>"})
entity = entity.index[datasource.self].entity
# trigger ingest
response = entity.import_data.post(
as_value(
{
"source": {
"type": "s3folder",
"bucket": "<bucket>",
"path": "<path>",
},
"options": {
"row_id_type": "string",
"partition_by": "sample_date.months", }
}
)
)
# wait for ingestion to finish
tracker = DefaultProgressTracking(timeout=3600)
try:
wait_progress(response, site.session, progress_tracker=tracker)
except TaskError as exc:
print(exc)
Update
This process involves two key steps:
- Identify the target datasource — The name is used to search through available Datasources. Once identified, the URL is used to select the correct entity.
- Update target datasource with a Parquet dataset from a specified s3 path
import pycrunch
from pycrunch.shoji import wait_progress, as_value, TaskError
from pycrunch.progress import DefaultProgressTracking
site = pycrunch.connect(
api_key="<crunch_api_key>",
site_url="<crunch_api_url>",
)
# get datasource entity
datasources = site.follow("datasources", {"name": "<name>"})
datasource = datasources.index[next(iter(datasources.index))].entity
# trigger update
response = datasource.update_data.post(
as_value(
{
"source": {
"type": "s3folder",
"bucket": "<bucket>",
"path": "<new_path>",
},
}
)
)
# wait for update to finish
tracker = DefaultProgressTracking(timeout=18000)
try:
wait_progress(response, site.session, progress_tracker=tracker)
except TaskError as exc:
print(exc)
Upload and ingest an SAV file
The following example demonstrates how to ingest an SAV based dataset into Crunch Lakehouse. This process involves four main steps:
- Create a datasource. Only the name is required and it can be used for identification.
- Request an upload link.
- Upload the SAV file.
- Ingest the dataset.
Additionally, the code waits for the ingestion process to complete and prints any errors that may occur during the process.
import os
import pycrunch
from pycrunch.shoji import as_entity, wait_progress, as_value, TaskError
from pycrunch.progress import DefaultProgressTracking
site = pycrunch.connect(
api_key="<crunch_api_key>",
site_url="<crunch_api_url>",
)
# the first step is to create a datasource,
# as it represents a dataset in the lake.
datasource = site.datasources.create(
as_entity(dict(name="<name>"))
)
# print datasource url
print(datasource.self)
# get datasource entity
entity = site.follow("datasources", {"name": "<name>"})
entity = entity.index[datasource.self].entity
# get file upload links to POST the SAV file to
# the upload targets API accepts multiple files for other types of upload
# but only one file is needed for SAV uploads.
sav_path = "<path to sav file>"
sav_name = os.path.basename(sav_path)
response = datasource.upload_targets.post(
as_value({"filenames": [sav_name], "options": {}})
)
payload = response.json()["value"]
revision = payload["revision"]
filelist = (sav_path,)
file_map = ((f, os.path.split(f)) for f in filelist)
files = {name: path for (path, (_, name)) in file_map}
# Upload the sav file
for key, val in payload["file_paths"].items():
with open(files[key], "rb") as f:
file = {"file": (key, f)}
requests.post(val["url"], data=val["fields"], files=file)
# Ingest the dataset
response = datasource.ingest.post(
as_value(
{"revision_id": revision}
)
)
# wait for ingestion to finish
tracker = DefaultProgressTracking(timeout=18000)
try:
wait_progress(response, site.session, progress_tracker=tracker)
except TaskError as exc:
print(exc)
An SQS message example. Note the relevant part under Messages.Body:
{
'Messages': [
{
'MessageId': '3478b16c-9056-46b1-9416-8c32f6e98ed7',
'ReceiptHandle': 'AQEBXuhI+SixrPUkBz38qslbpJ86O9PzwUrSC8FDLV3IgfMZtmFdKnS/9EwrJJ7yQKLSQX/J5+JNViSRtTjyyufL3X5BS7RqRdxlrpw9K0U1S/601UOoiFqE2CC8jj1n6ymJjwSwy7+d/iicr2OaNPxFvkrbLkA/TTauyk7+W+p4tbxxyY670IKIV/Kcmz8EYbvWZs6Jx0e7K5yHlYEhWRpOHdHoEJuh2M2ALhagTG+03JXYKWXqLwlovA3obZ78EBSa7MZeDoMyINv1FjSj/DWVioFsfaX9Ole3uvyWkv8mzCVVN+7Gg2NyE15yifRj7vrLkUuiWI3lLy1J2So8HoljWX34kEGcq/hWk3Kv6PUepLZimK6mquAWObOhJDnaQtcoEPiztiiIei9pJ3YhBAoKLg==',
'MD5OfBody': '5c08b549e64153d0257963129d53f9b0',
'Body': '{"event": {"action": "update"}, "entities": {"http://cr-server.webserver.svc.cluster.local:8080/api/datasources/32dac0a5def04aa4b01d09aa9b08ebeb/": {"name": "demo-exports", "lake_id": "83d21e77-cd87-4d66-ae3d-607eaafad583"}, "http://cr-server.webserver.svc.cluster.local:8080/api/datadestinations/31058febfb234d8086c9286ded50f512/": {"name": "export_demo_dataset", "data": {"type": "s3folder", "bucket": "tmp-bucket-lake", "base": "export-demo-data"}}}, "body": {"version": "01JS27RRC2YMGC6RE8PVT2V7QR"}}'
}
],
'ResponseMetadata': {
'RequestId': '4b92e2e6-0781-5391-9980-829d2f1d31b5',
'HTTPStatusCode': 200,
'HTTPHeaders': {
'x-amzn-requestid': '4b92e2e6-0781-5391-9980-829d2f1d31b5',
'date': 'Thu, 17 Apr 2025 16:03:40 GMT',
'content-type': 'application/x-amz-json-1.0',
'content-length': '1103',
'connection': 'keep-alive'
},
'RetryAttempts': 0
}