Ingestion/Importing data
SPSS files (.sav) and Parquet data can be ingested to create datasets in the Lakehouse. Parquet based data must be accompanied by schema and metadata as described in the Crunch Logical Schema.
New data can be ingested via file uploads (using AWS Accelerated Upload APIs) or directly from an AWS S3 Bucket.
All ingestion happens through a DataSource. The steps include:
- Create a DataSource.
- Use the DataSource APIs to ingest via file uploads or direct from an S3 bucket.
A DataSource represents a single dataset.
A DataSource supports initial ingestion and subsequent updates to the data (see the next section).
Why do file uploads use AWS Accelerated Upload APIs?
The AWS Accelerated Upload API provides faster uploads through regional data centers, improving upload performance. Your data speedily uploads to a local AWS data center, and then internally transfers to the Crunch Lakehouse Data Platform maximizing the performance of the AWS global backbone network.
API references
- DataSource API Documentation
- DataSource File Upload API Documentation
- DataSource Ingestion Direct from S3
Generated Crunch Automation scripts for SPSS files
Lakehouse SAV ingestion automatically generates automation scripts to augment and enhance the raw data. Depending on the content of the SAV file, scripts may be generated to :
- Create categorical arrays
Similar categorical variables are combined to facilitate analysis. - Recode multiple response categorical variables
Multiple response variables are recoded when the SPSS file makes no distinction between survey choices being seen by respondents or simply not selected. - Create multiple response segmentations
Creates cross cutting categorical arrays from similar multiple response variables to facilitate analysis. - Create multiple response dashboards
Creates dashboards for similar multiple response variables to facilitate analysis
Generated scripts will appear under a Generated Scripts tab in the dataset Automation menu where they can be individually reviewed, copied, edited and applied.
API references
Updating data
After creating a dataset through the DataSource API, you will likely need to update it. The update API provides the capability to add non-breaking schema and/or metadata changes to a dataset.
This includes the ability to add variables to datasets, as well as add categories to variables and row level data.
Currently, the Update API only supports updates direct from S3.
Update Notifications
Notifications are supported for update operations. Any time there is an update to a dataset, you may receive a notification for it.
These are important when the upstream and downstream systems of a data pipeline are decoupled/managed by different teams.
An upstream source of data uses the ingestion and update APIs to load new data in to the Lakehouse. Downstream systems may subscribe to notifications about updates via the DataDestination.
A typical use case for update notifications is to trigger an export of new data.
Currently, notifications are delivered through an SQS queue provided by you.
Here's what a notification payload looks like:
{
"event": {"action": "update"},
"entities": {"datasources": {"lake_id": <dataset ID>}},
"body": {"version": <dataset version>}
}
Example payload
An SQS message example. Note the relevant part under Messages.Body:
{
'Messages': [
{
'MessageId': '3478b16c-9056-46b1-9416-8c32f6e98ed7',
'ReceiptHandle': 'AQEBXuhI+SixrPUkBz38qslbpJ86O9PzwUrSC8FDLV3IgfMZtmFdKnS/9EwrJJ7yQKLSQX/J5+JNViSRtTjyyufL3X5BS7RqRdxlrpw9K0U1S/601UOoiFqE2CC8jj1n6ymJjwSwy7+d/iicr2OaNPxFvkrbLkA/TTauyk7+W+p4tbxxyY670IKIV/Kcmz8EYbvWZs6Jx0e7K5yHlYEhWRpOHdHoEJuh2M2ALhagTG+03JXYKWXqLwlovA3obZ78EBSa7MZeDoMyINv1FjSj/DWVioFsfaX9Ole3uvyWkv8mzCVVN+7Gg2NyE15yifRj7vrLkUuiWI3lLy1J2So8HoljWX34kEGcq/hWk3Kv6PUepLZimK6mquAWObOhJDnaQtcoEPiztiiIei9pJ3YhBAoKLg==',
'MD5OfBody': '5c08b549e64153d0257963129d53f9b0',
'Body': '{"event": {"action": "update"}, "entities": {"http://cr-server.webserver.svc.cluster.local:8080/api/datasources/32dac0a5def04aa4b01d09aa9b08ebeb/": {"name": "demo-exports", "lake_id": "83d21e77-cd87-4d66-ae3d-607eaafad583"}, "http://cr-server.webserver.svc.cluster.local:8080/api/datadestinations/31058febfb234d8086c9286ded50f512/": {"name": "export_demo_dataset", "data": {"type": "s3folder", "bucket": "tmp-bucket-lake", "base": "export-demo-data"}}}, "body": {"version": "01JS27RRC2YMGC6RE8PVT2V7QR"}}'
}
],
'ResponseMetadata': {
'RequestId': '4b92e2e6-0781-5391-9980-829d2f1d31b5',
'HTTPStatusCode': 200,
'HTTPHeaders': {
'x-amzn-requestid': '4b92e2e6-0781-5391-9980-829d2f1d31b5',
'date': 'Thu, 17 Apr 2025 16:03:40 GMT',
'content-type': 'application/x-amz-json-1.0',
'content-length': '1103',
'connection': 'keep-alive'
},
'RetryAttempts': 0
}
S3 Ingest Permissions
When ingesting/importing data from an S3 bucket, the Crunch platform must have access to the source bucket (or folder).
A policy must be provided to the source bucket that allows ListObjects, ListObjectsV2, and GetObject permissions for AWS account 993035761779, and if needed, role arn:aws:iam::993035761779:role/eks-1-prod-eu-west-1-crunch-lake-consumer-service-account-role.
Example policy
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Principal": {
"AWS": "*"
},
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::*<your bucket>*",
"arn:aws:s3:::*<your bucket>*/**"
],
"Condition": {
"Bool": {
"aws:SecureTransport": "false"
}
}
},
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::993035761779:role/eks-1-prod-eu-west-1-crunch-lake-consumer-service-account-role"
},
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::<your bucket>/**"
},
*{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::993035761779:role/eks-1-prod-eu-west-1-crunch-lake-consumer-service-account-role"
},
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::<your bucket>/**"
}]
}
Python code examples
The following code examples rely on the public Crunch client pycrunch package.
Installation
pip install pycrunchIngest a Parquet file directly from S3
Import data
The following code demonstrates how to ingest a Parquet based dataset into Crunch Lakehouse. This process involves two main steps:
- Create a datasource — Only the name is required and it can be used for identification.
-
Ingest a Parquet dataset from a given s3 path — This step allows for additional configuration options, including:
- row_id_type: Specifies the expected type of row IDs for validation during ingestion.
- partition_by: Defines how the dataset will be partitioned in the lake to optimize query performance. By default, datasets are partitioned by var_name. Additional partitioning, for example by month in the sample_date column can be specified, as shown in the example below.
The S3 bucket should be structured as:
<path>/data/<parquet files>
<path>/meta/<schema & metadata files>
(E.g. schema.json, metadata.json, folders.json, ...)Additionally, the code waits for the ingestion process to complete and prints any errors that may occur during the process.
import pycrunch
from pycrunch.shoji import as_entity, wait_progress, as_value, TaskError
from pycrunch.progress import DefaultProgressTracking
site = pycrunch.connect(
api_key="<crunch_api_key>",
site_url="<crunch_api_url>",
)
# the first step is to create a datasource,
# as it represents a dataset in the lake.
datasource = site.datasources.create(
as_entity(dict(name="<name>"))
)
# print datasource url
print(datasource.self)
# get datasource entity
entity = site.follow("datasources", {"name": "<name>"})
entity = entity.index[datasource.self].entity
# trigger ingest
response = entity.import_data.post(
as_value(
{
"source": {
"type": "s3folder",
"bucket": "<bucket>",
"path": "<path>",
},
"options": {
"row_id_type": "string",
"partition_by": "sample_date.months",
}
}
)
)
# wait for ingestion to finish
tracker = DefaultProgressTracking(timeout=3600)
try:
wait_progress(response, site.session, progress_tracker=tracker)
except TaskError as exc:
print(exc)
API references
Update data
This process involves two key steps:
- Identify the target datasource — The name is used to search through available Datasources. Once identified, the URL is used to select the correct entity.
- Update target datasource with a Parquet dataset from a specified s3 path
import pycrunch
from pycrunch.shoji import wait_progress, as_value, TaskError
from pycrunch.progress import DefaultProgressTracking
site = pycrunch.connect(
api_key="<crunch_api_key>",
site_url="<crunch_api_url>",
)
# get datasource entity
datasources = site.follow("datasources", {"name": "<name>"})
datasource = datasources.index[next(iter(datasources.index))].entity
# trigger update
response = datasource.update_data.post(
as_value(
{
"source": {
"type": "s3folder",
"bucket": "<bucket>",
"path": "<new_path>",
},
}
)
)
# wait for update to finish
tracker = DefaultProgressTracking(timeout=18000)
try:
wait_progress(response, site.session, progress_tracker=tracker)
except TaskError as exc:
print(exc)
API references
Upload and ingest an SAV file
The following example demonstrates how to ingest an SAV based dataset into Crunch Lakehouse. This process involves four main steps:
- Create a datasource. Only the name is required and it can be used for identification.
- Request an upload link.
- Upload the SAV file.
- Ingest the dataset.
Additionally, the code waits for the ingestion process to complete and prints any errors that may occur during the process.
import os
import pycrunch
from pycrunch.shoji import as_entity, wait_progress, as_value, TaskError
from pycrunch.progress import DefaultProgressTracking
site = pycrunch.connect(
api_key="<crunch_api_key>",
site_url="<crunch_api_url>",
)
# the first step is to create a datasource,
# as it represents a dataset in the lake.
datasource = site.datasources.create(
as_entity(dict(name="<name>"))
)
# print datasource url
print(datasource.self)
# get datasource entity
entity = site.follow("datasources", {"name": "<name>"})
entity = entity.index[datasource.self].entity
# get file upload links to POST the SAV file to
# the upload targets API accepts multiple files for other types of upload
# but only one file is needed for SAV uploads.
sav_path = "<path to sav file>"
sav_name = os.path.basename(sav_path)
response = datasource.upload_targets.post(
as_value({"filenames": [sav_name], "options": {}})
)
payload = response.json()["value"]
revision = payload["revision"]
filelist = (sav_path,)
file_map = ((f, os.path.split(f)) for f in filelist)
files = {name: path for (path, (_, name)) in file_map}
# Upload the sav file
for key, val in payload["file_paths"].items():
with open(files[key], "rb") as f:
file = {"file": (key, f)}
requests.post(val["url"], data=val["fields"], files=file)
# Ingest the dataset
response = datasource.ingest.post(
as_value(
{"revision_id": revision}
)
)
# wait for ingestion to finish
tracker = DefaultProgressTracking(timeout=18000)
try:
wait_progress(response, site.session, progress_tracker=tracker)
except TaskError as exc:
print(exc)
API references