feat: add basic ingest from spark iceberg example

This commit is contained in:
Kasper Juul Hermansen 2025-01-31 23:59:21 +01:00
commit d27096896b
Signed by: kjuulh
SSH Key Fingerprint: SHA256:RjXh0p7U6opxnfd3ga/Y9TCo18FYlHFdSpRIV72S/QM
8 changed files with 460 additions and 0 deletions

2
.drone.yml Normal file
View File

@ -0,0 +1,2 @@
kind: template
load: cuddle-empty-plan.yaml

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
.cuddle/

16
cuddle.yaml Normal file
View File

@ -0,0 +1,16 @@
# yaml-language-server: $schema=https://git.front.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json
base: "git@git.front.kjuulh.io:kjuulh/cuddle-empty-plan.git"
vars:
service: "data-platform-in-a-box"
registry: kasperhermansen
please:
project:
owner: kjuulh
repository: "data-platform-in-a-box"
branch: main
settings:
api_url: https://git.front.kjuulh.io

212
docker-compose.yml Normal file
View File

@ -0,0 +1,212 @@
services:
redpanda:
image: redpandadata/redpanda:v24.3.4
container_name: redpanda
ports:
- "9092:9092"
- "8081:8081"
- "8082:8082"
- "29092:29092"
command:
- redpanda
- start
- --overprovisioned
- --smp
- "1"
- --memory
- "1G"
- --reserve-memory
- "0M"
- --node-id
- "0"
- --kafka-addr
- PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr
- PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
- --check=false
connect:
image: confluentinc/cp-kafka-connect-base:7.8.0
depends_on:
- redpanda
hostname: connect
container_name: connect
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: 'redpanda:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: connect-cluster-group
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.StringConverter"
CONNECT_INTERNAL_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/,/connectors/'
AWS_ACCESS_KEY_ID: "minioadmin"
AWS_SECRET_ACCESS_KEY: "minioadmin"
command:
- bash
- -c
- |
#
echo "Installing connector plugins"
confluent-hub install --no-prompt tabular/iceberg-kafka-connect:0.4.11
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
echo "Waiting for Kafka Connect to start listening on localhost ⏳"
while : ; do
curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
if [ $$curl_status -eq 200 ] ; then
break
fi
sleep 5
done
echo -e "\n--\n+> Creating connector"
curl -X PUT \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' http://localhost:8083/connectors/IcebergSinkConnector/config \
-d '{
"tasks.max": "1",
"topics": "payments",
"connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
"iceberg.catalog.s3.endpoint": "http://minio:9000",
"iceberg.catalog.s3.secret-access-key": "minioadmin",
"iceberg.catalog.s3.access-key-id": "minioadmin",
"iceberg.catalog.s3.path-style-access": "true",
"iceberg.catalog.uri": "http://rest:8181",
"iceberg.catalog.warehouse": "s3://warehouse/",
"iceberg.catalog.client.region": "eu-west-1",
"iceberg.catalog.type": "rest",
"iceberg.control.commitIntervalMs": "1000",
"iceberg.tables": "orders.payments",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"schemas.enable": "false"
}'
sleep infinity
console:
image: redpandadata/console:v2.1.1
entrypoint: /bin/sh
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
environment:
CONFIG_FILEPATH: /tmp/config.yml
CONSOLE_CONFIG_FILE: |
kafka:
brokers: ["redpanda:29092"]
schemaRegistry:
enabled: true
urls: ["http://redpanda:8081"]
connect:
enabled: true
clusters:
- name: local-connect-cluster
url: http://connect:8083
redpanda:
adminApi:
enabled: true
urls: ["http://redpanda:9644"]
ports:
- 18080:8080
depends_on:
- redpanda
- connect
minio:
image: minio/minio
hostname: minio
container_name: minio
environment:
- MINIO_ROOT_USER=minioadmin
- MINIO_ROOT_PASSWORD=minioadmin
- MINIO_DOMAIN=minio
networks:
default:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
aws:
image: amazon/aws-cli
container_name: aws-cli
command: |
-c "sleep 2 && \
aws --endpoint-url http://minio:9000 s3 mb s3://warehouse --region eu-west-1 || exit 0"
entrypoint: [/bin/bash]
environment:
AWS_ACCESS_KEY_ID: "minioadmin"
AWS_SECRET_ACCESS_KEY: "minioadmin"
depends_on:
- minio
spark-iceberg:
image: tabulario/spark-iceberg
hostname: spark-iceberg
container_name: spark-iceberg
build: spark/
depends_on:
- rest
- minio
environment:
AWS_ACCESS_KEY_ID: minioadmin
AWS_SECRET_ACCESS_KEY: minioadmin
AWS_REGION: eu-west-1
SPARK_DEFAULTS: |
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.iceberg org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.catalog-impl org.apache.iceberg.rest.RESTCatalog
spark.sql.catalog.iceberg.uri http://rest:8181
spark.sql.catalog.iceberg.io-impl org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.iceberg.warehouse s3://warehouse/wh/
spark.sql.catalog.iceberg.s3.endpoint http://minio:9000
spark.sql.catalog.iceberg.s3.path-style-access true
spark.sql.defaultCatalog iceberg
spark.sql.catalogImplementation in-memory
spark.eventLog.enabled true
spark.eventLog.dir /home/iceberg/spark-events
spark.history.fs.logDirectory /home/iceberg/spark-events
spark.jars.packages org.apache.hadoop:hadoop-aws:3.2.0
ports:
- 8888:8888
- 8080:8080
- 10000:10000
- 10001:10001
volumes:
- ./spark:/home/iceberg/scripts
- ./notebooks:/home/iceberg/notebooks/notebooks
command: ["echo \"$$SPARK_DEFAULTS\" > /opt/spark/conf/spark-defaults.conf && spark-submit /home/iceberg/scripts/create_table.py && notebook"]
rest:
image: tabulario/iceberg-rest
hostname: rest
container_name: rest
ports:
- 8181:8181
environment:
- AWS_ACCESS_KEY_ID=minioadmin
- AWS_SECRET_ACCESS_KEY=minioadmin
- AWS_REGION=eu-west-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
- CATALOG_S3_PATH__STYLE__ACCESS=True

View File

@ -0,0 +1,103 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "29f8d24e-e4bf-484d-afd4-cb82ff6cd50d",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SHOW DATABASES"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "70349765-e5f1-43a5-a141-cc2d54c69a58",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SHOW TABLES FROM orders"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fabaed9c-9049-4996-9d26-b20f66303911",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SHOW TBLPROPERTIES orders.payments"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6317d9c6-140e-4a63-890e-2173fbb9503e",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SELECT COUNT(*)\n",
"FROM orders.payments"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2a1ff132-dc65-4943-a9be-416ba5a13c26",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SELECT *\n",
"FROM orders.payments\n",
"LIMIT 10"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a2688a95-594c-45ad-9d49-70a1bcd59a1b",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SELECT * \n",
"FROM orders.payments.partitions\n",
"ORDER BY record_count DESC\n",
"LIMIT 10"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.17"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

103
notebooks/iceberg.ipynb Normal file
View File

@ -0,0 +1,103 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "29f8d24e-e4bf-484d-afd4-cb82ff6cd50d",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SHOW DATABASES"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "70349765-e5f1-43a5-a141-cc2d54c69a58",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SHOW TABLES FROM orders"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fabaed9c-9049-4996-9d26-b20f66303911",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SHOW TBLPROPERTIES orders.payments"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6317d9c6-140e-4a63-890e-2173fbb9503e",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SELECT COUNT(*)\n",
"FROM orders.payments"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2a1ff132-dc65-4943-a9be-416ba5a13c26",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SELECT *\n",
"FROM orders.payments\n",
"LIMIT 10"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a2688a95-594c-45ad-9d49-70a1bcd59a1b",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SELECT * \n",
"FROM orders.payments.partitions\n",
"ORDER BY record_count DESC\n",
"LIMIT 10"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.17"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

3
renovate.json Normal file
View File

@ -0,0 +1,3 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
}

20
spark/create_table.py Normal file
View File

@ -0,0 +1,20 @@
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("").getOrCreate()
print("creating database")
spark.sql('CREATE DATABASE IF NOT EXISTS orders')
print("creating table")
spark.sql('''
CREATE TABLE IF NOT EXISTS orders.payments (
id STRING,
type STRING,
created_at TIMESTAMP,
document STRING,
payer STRING,
amount INT
)
USING iceberg
PARTITIONED BY (document)
''')