The Crunch Lakehouse supports multiple ways to export data. It is possible to:
- Create a single data file download
- Export a Crunch Dataset
- Export a Lakehouse datasource
Single Data File Downloads
A single data file download provides a simple mechanism for exporting small datasets in an easy to use form.
When you use the single data file download approach you will be provided a time limited URL to download the (parquet) data file, and corresponding schema and metadata files (JSON).
The URLs will expire after 1 hour.
Downloads are limited to datasets with less than 50 million respondent answers (based on rows * columns)
Crunch Datasets or Lakehouse Datasources are supported. Understanding the difference:
- Crunch Datasets
Datasets in the Crunch Data Platform are backed by the Crunch analysis engine and are the primary form of survey data in the Crunch Data Platform (CDP). Most data today is a CDP dataset. This is the most likely way to download data. The URLs for datasets look like:
"https://your-domain.crunch.io/api/datasets/<dataset id>" - Lakehouse Datasources
Data ingested directly to the Lakehouse, or otherwise synchronized to the Lakehouse from the Crunch Data Platform are represented as Datasources. The URLs for datasources look like:
"https://your-domain.crunch.io/api/datasources/<datasource id>"
When using the download API, you may specify "dataset" or "datasource" with the corresponding URL for the download.
API Reference
Python Code Example
import json
import pycrunch
from pycrunch.progress import DefaultProgressTracking
from pycrunch.shoji import as_entity, wait_progress, TaskError
site = pycrunch.connect(
api_key="<crunch_api_key>",
site_url="<crunch_api_url>",
)
token = f"Bearer <crunch_api_key>"
dataset_id = "<Crunch Dataset ID>"
response = site.datasources.download.post(
as_entity(
{
"dataset": f"{site.self}/datasets/{dataset_id}/",
}
)
)
# wait for export to finish
tracker = DefaultProgressTracking(timeout=3600)
try:
wait_progress(response, site.session, progress_tracker=tracker)
workflow_url = response.json()["value"]
response = requests.get(url=workflow_url, headers={"Authorization": token})
links = response.json()["value"]["result"]
for k, v in links.items():
print(k)
print(v)
except TaskError as exc:
print(exc)
Export a Crunch Dataset
Any dataset in the Crunch Data Platform maybe exported using the Lakehouse Export APIs.
The Lakehouse export APIs support S3 folders as a destination. See below for more details about Destinations.
Data may be exported from the Lakehouse in Parquet format (with accompanying schema and metadata).
Exporting data to S3 requires the S3 bucket to have specific permission setup to allow Crunch to write to the bucket. See S3 Export Permissions below.
API references
Python Code Example
import time
import pycrunch
import requests
from pycrunch.shoji import as_entity
dataset_id = "<Crunch Dataset ID>"
api_key="<crunch_api_key>"
site = pycrunch.connect(
api_key=api_key,
site_url="<crunch_api_url>",
)
token = f"Bearer {api_key}"
# get datadestination entity
destinations = site.follow("datadestinations", {"name": "<destination_name>"})
destination = destinations.index[next(iter(destinations.index))].entity
response = site.datasources.export.post(
as_entity(
{
"dataset": f"{site.self}/datasets/{dataset_id}/",
"options": {
"export_type": "full",
"path": "<destination subfolder>",
},
"datadestination": destination.self,
}
)
)
workflow_url = response.json()["value"]["progress"]
print(f"Export triggered for Datasource {response.json()['value']['datasource']} workflow {workflow_url}")
while True:
response = requests.get(url=workflow_url, headers={"Authorization": token})
progress = response.json()["value"]["progress"]
if progress == -1:
print(f"error: export failed {workflow_url}")
quit()
if progress == 100:
break
time.sleep(10)
print(
f"export completed"
)
Export a Lakehouse Datasource
Data that has been ingested directly in to the Lakehouse, or synchronized with the Lakehouse may be exported using a specific datasource export API.
The Lakehouse export APIs support S3 folders as a destination. See below for more details about Destinations.
Data may be exported from the Lakehouse in Parquet format (with accompanying schema and metadata).
Exporting data to S3 requires the S3 bucket to have specific permission setup to allow Crunch to write to the bucket. See S3 Export Permissions below.
API references
Destinations
Lakehouse Export APIs leverage an extensible destination API to support many platforms for exports.
Currently, AWS S3 folders are the only supported destination type but this will soon expand to other cloud provider storage systems.
Exporting data requires creating a DataDestination, which defines the location data that it is exported to and a name for the destination. The DataDestination type for S3 is s3folder
It is not necessary to have a DataDestination for every export. DataDestination’s may be reused. The DataDestination can specify a base folder, and the export API allows exports to subfolders.
This provide great flexibility by having named destinations that are meaningful to your workflows, while allowing many exports in a controlled way.
You may design your destinations and reuse them as you see fit. Common patterns are :
- Have one destination per S3 bucket
- Have one destination and base folder (top level folder) per geographical region or product
Each Destination has a human readable name which can be found by listing the destinations through the API
Python Code Example
import json
import pycrunch
site = pycrunch.connect(
api_key="<crunch_api_key>",
site_url="<crunch_api_url>",
)
# get datasource entity
datasources = site.follow("datasources", {"name": "<name>"})
datasource = datasources.index[next(iter(datasources.index))].entity
# create data destination
destination = site.datadestinations.create(
as_entity(
{
"name": "<destination_name>",
"data": {
"type": "s3folder",
"bucket": "<your-dest-bucket>",
"base": "<your-dest-folder>", # optional
},
"notifications": [
{
"event": {"action": "update"},
"notify": {"type": "sqs", "queue_url": "<your-sqs-queue-url>"},
}
],
"datasources": [datasource.self],
}
)
)
API references
Destination Notifications
DataDestinations provide notifications for a datasource on various events — when enabled, a message will be sent to the specified SQS queue each time an update occurs for the associated datasources.
Typically, Update Notifications are used to trigger exports. See Ingesting & Updating Data for information on Update Notifications
Code Example
import json
import pycrunch
site = pycrunch.connect(
api_key="<crunch_api_key>",
site_url="<crunch_api_url>",
)
# get datasource entity
datasources = site.follow("datasources", {"name": "<name>"})
datasource = datasources.index[next(iter(datasources.index))].entity
# create data destination
destination = site.datadestinations.create(
as_entity(
{
"name": "<destination_name>",
"data": {
"type": "s3folder",
"bucket": "<your-dest-bucket>",
"base": "<your-dest-folder>", # optional
},
"notifications": [
{
"event": {"action": "update"},
"notify": {"type": "sqs", "queue_url": "<your-sqs-queue-url>"},
}
],
"datasources": [datasource.self],
}
)
)SQS permissions
Any SQS queue you provide for notifications must allow the Crunch Lakehouse to post new messages to the queue. Here is an example policy to allow that:
{
"Version": "2012-10-17",
"Id": "__default_policy_ID",
"Statement": [
{
"Sid": "__owner_statement",
"Effect": "Allow",
"Principal": {
"AWS": [
"arn:aws:iam::<your account id>:root",
"arn:aws:iam::993035761779:role/eks-1-prod-eu-west-1-webserver-sa-role",
"arn:aws:iam::993035761779:root"
]
},
"Action": "SQS:*",
"Resource": "arn:aws:sqs:eu-west-1:<your account id>:<your sqs queue name>"
},
{
"Sid": "__sender_statement",
"Effect": "Allow",
"Principal": {
"AWS": [
"arn:aws:iam::993035761779:role/eks-1-prod-eu-west-1-webserver-sa-role",
"arn:aws:iam::993035761779:root",
"arn:aws:iam::<your account id>:root",
"arn:aws:iam::<your account id>:user/<your iam user name>"
]
},
"Action": "SQS:SendMessage",
"Resource": "arn:aws:sqs:eu-west-1:<your account id>:<your queue name>"
},
{
"Sid": "__receiver_statement",
"Effect": "Allow",
"Principal": {
"AWS": [
"arn:aws:iam::993035761779:role/eks-1-prod-eu-west-1-webserver-sa-role",
"arn:aws:iam::993035761779:root",
"arn:aws:iam::<your account id>:root",
"arn:aws:iam::<your account id>:user/<your iam user name>"
]
},
"Action": [
"SQS:ChangeMessageVisibility",
"SQS:DeleteMessage",
"SQS:ReceiveMessage"
],
"Resource": "arn:aws:sqs:eu-west-1:<your account id>:<your sqs queue name>"
}
]
}
Correspondingly, Crunch needs to configure its systems to be aware of the SQS queue. To receive notifications through SQS, you must provide Crunch the ARN of the SQS queue.
S3 Export Permissions
When exporting to an S3 bucket, the Crunch platform must have access to the destination bucket (or folder).
A policy must be provided to the destination bucket that allows ListObjects, ListObjectsV2, GetObject, and PutObject permissions for AWS account 993035761779, and if needed, role arn:aws:iam::993035761779:role/eks-1-prod-eu-west-1-crunch-lake-consumer-service-account-role.
Here's an example policy:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Principal": {
"AWS": "*"
},
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::*<your bucket>*",
"arn:aws:s3:::*<your bucket>*/**"
],
"Condition": {
"Bool": {
"aws:SecureTransport": "false"
}
}
},
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::993035761779:role/eks-1-prod-eu-west-1-crunch-lake-consumer-service-account-role"
},
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::<your bucket>/**"
},
*{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::993035761779:role/eks-1-prod-eu-west-1-crunch-lake-consumer-service-account-role"
},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::<your bucket>/**"
},
*{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::993035761779:role/eks-1-prod-eu-west-1-crunch-lake-consumer-service-account-role"
},
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::<your bucket>/**"
}]
}
Python Code Example for a Datasource Export
This process allows users to export data from the Crunch Lakehouse to a specified DataDestination.
Crunch currently supports two types of exports:
- delta: Exports only the data that has changed between the specified version and the previous one.
- full: Exports the entire dataset.
import json
import pycrunch
from pycrunch.progress import DefaultProgressTracking
from pycrunch.shoji import as_entity, wait_progress, TaskError
site = pycrunch.connect(
api_key="<crunch_api_key>",
site_url="<crunch_api_url>",
)
# get datasource entity
datasources = site.follow("datasources", {"name": "<name>"})
datasource = datasources.index[next(iter(datasources.index))].entity
# get datadestination entity
destinations = site.follow("datadestinations", {"name": "<destination_name>"})
destination = destinations.index[next(iter(destinations.index))].entity
# trigger export
response = datasource.export_data.post(
as_entity(
{
"datadestination": destination.self,
"export_type": "delta",
"path": str(int(time.time())), # optional
"version": "<version>", # optional, if null latest will be used
}
)
)
# wait for export to finish
tracker = DefaultProgressTracking(timeout=3600)
try:
wait_progress(response, site.session, progress_tracker=tracker)
except TaskError as exc:
print(exc)