commit d27096896be45e49777ff75fcf82bb9c24048a0c Author: kjuulh Date: Fri Jan 31 23:59:21 2025 +0100 feat: add basic ingest from spark iceberg example diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..b891f02 --- /dev/null +++ b/.drone.yml @@ -0,0 +1,2 @@ +kind: template +load: cuddle-empty-plan.yaml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..75d1871 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.cuddle/ diff --git a/cuddle.yaml b/cuddle.yaml new file mode 100644 index 0000000..d6a17b0 --- /dev/null +++ b/cuddle.yaml @@ -0,0 +1,16 @@ +# yaml-language-server: $schema=https://git.front.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json + +base: "git@git.front.kjuulh.io:kjuulh/cuddle-empty-plan.git" + +vars: + service: "data-platform-in-a-box" + registry: kasperhermansen + +please: + project: + owner: kjuulh + repository: "data-platform-in-a-box" + branch: main + settings: + api_url: https://git.front.kjuulh.io + diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..154d34e --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,212 @@ +services: + redpanda: + image: redpandadata/redpanda:v24.3.4 + container_name: redpanda + ports: + - "9092:9092" + - "8081:8081" + - "8082:8082" + - "29092:29092" + command: + - redpanda + - start + - --overprovisioned + - --smp + - "1" + - --memory + - "1G" + - --reserve-memory + - "0M" + - --node-id + - "0" + - --kafka-addr + - PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 + - --advertise-kafka-addr + - PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092 + - --check=false + + connect: + image: confluentinc/cp-kafka-connect-base:7.8.0 + depends_on: + - redpanda + hostname: connect + container_name: connect + ports: + - 8083:8083 + environment: + CONNECT_BOOTSTRAP_SERVERS: 'redpanda:29092' + CONNECT_REST_ADVERTISED_HOST_NAME: "connect" + CONNECT_REST_PORT: 8083 + CONNECT_GROUP_ID: connect-cluster-group + CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs + CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets + CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter + CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false" + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" + CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.StringConverter" + CONNECT_INTERNAL_KEY_CONVERTER_SCHEMAS_ENABLE: "false" + CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" + CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" + CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO" + CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR" + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" + CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/,/connectors/' + AWS_ACCESS_KEY_ID: "minioadmin" + AWS_SECRET_ACCESS_KEY: "minioadmin" + command: + - bash + - -c + - | + # + echo "Installing connector plugins" + confluent-hub install --no-prompt tabular/iceberg-kafka-connect:0.4.11 + # + echo "Launching Kafka Connect worker" + /etc/confluent/docker/run & + # + echo "Waiting for Kafka Connect to start listening on localhost ⏳" + while : ; do + curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) + echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)" + if [ $$curl_status -eq 200 ] ; then + break + fi + sleep 5 + done + echo -e "\n--\n+> Creating connector" + curl -X PUT \ + -H 'Content-Type: application/json' \ + -H 'Accept: application/json' http://localhost:8083/connectors/IcebergSinkConnector/config \ + -d '{ + "tasks.max": "1", + "topics": "payments", + "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector", + "iceberg.catalog.s3.endpoint": "http://minio:9000", + "iceberg.catalog.s3.secret-access-key": "minioadmin", + "iceberg.catalog.s3.access-key-id": "minioadmin", + "iceberg.catalog.s3.path-style-access": "true", + "iceberg.catalog.uri": "http://rest:8181", + "iceberg.catalog.warehouse": "s3://warehouse/", + "iceberg.catalog.client.region": "eu-west-1", + "iceberg.catalog.type": "rest", + "iceberg.control.commitIntervalMs": "1000", + "iceberg.tables": "orders.payments", + "value.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "schemas.enable": "false" + }' + sleep infinity + + console: + image: redpandadata/console:v2.1.1 + entrypoint: /bin/sh + command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console" + environment: + CONFIG_FILEPATH: /tmp/config.yml + CONSOLE_CONFIG_FILE: | + kafka: + brokers: ["redpanda:29092"] + schemaRegistry: + enabled: true + urls: ["http://redpanda:8081"] + connect: + enabled: true + clusters: + - name: local-connect-cluster + url: http://connect:8083 + redpanda: + adminApi: + enabled: true + urls: ["http://redpanda:9644"] + ports: + - 18080:8080 + depends_on: + - redpanda + - connect + + minio: + image: minio/minio + hostname: minio + container_name: minio + environment: + - MINIO_ROOT_USER=minioadmin + - MINIO_ROOT_PASSWORD=minioadmin + - MINIO_DOMAIN=minio + networks: + default: + aliases: + - warehouse.minio + ports: + - 9001:9001 + - 9000:9000 + command: ["server", "/data", "--console-address", ":9001"] + + aws: + image: amazon/aws-cli + container_name: aws-cli + command: | + -c "sleep 2 && \ + aws --endpoint-url http://minio:9000 s3 mb s3://warehouse --region eu-west-1 || exit 0" + entrypoint: [/bin/bash] + environment: + AWS_ACCESS_KEY_ID: "minioadmin" + AWS_SECRET_ACCESS_KEY: "minioadmin" + depends_on: + - minio + + spark-iceberg: + image: tabulario/spark-iceberg + hostname: spark-iceberg + container_name: spark-iceberg + build: spark/ + depends_on: + - rest + - minio + environment: + AWS_ACCESS_KEY_ID: minioadmin + AWS_SECRET_ACCESS_KEY: minioadmin + AWS_REGION: eu-west-1 + SPARK_DEFAULTS: | + spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions + spark.sql.catalog.iceberg org.apache.iceberg.spark.SparkCatalog + spark.sql.catalog.iceberg.catalog-impl org.apache.iceberg.rest.RESTCatalog + spark.sql.catalog.iceberg.uri http://rest:8181 + spark.sql.catalog.iceberg.io-impl org.apache.iceberg.aws.s3.S3FileIO + spark.sql.catalog.iceberg.warehouse s3://warehouse/wh/ + spark.sql.catalog.iceberg.s3.endpoint http://minio:9000 + spark.sql.catalog.iceberg.s3.path-style-access true + spark.sql.defaultCatalog iceberg + spark.sql.catalogImplementation in-memory + spark.eventLog.enabled true + spark.eventLog.dir /home/iceberg/spark-events + spark.history.fs.logDirectory /home/iceberg/spark-events + spark.jars.packages org.apache.hadoop:hadoop-aws:3.2.0 + ports: + - 8888:8888 + - 8080:8080 + - 10000:10000 + - 10001:10001 + volumes: + - ./spark:/home/iceberg/scripts + - ./notebooks:/home/iceberg/notebooks/notebooks + command: ["echo \"$$SPARK_DEFAULTS\" > /opt/spark/conf/spark-defaults.conf && spark-submit /home/iceberg/scripts/create_table.py && notebook"] + + rest: + image: tabulario/iceberg-rest + hostname: rest + container_name: rest + ports: + - 8181:8181 + environment: + - AWS_ACCESS_KEY_ID=minioadmin + - AWS_SECRET_ACCESS_KEY=minioadmin + - AWS_REGION=eu-west-1 + - CATALOG_WAREHOUSE=s3://warehouse/ + - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO + - CATALOG_S3_ENDPOINT=http://minio:9000 + - CATALOG_S3_PATH__STYLE__ACCESS=True diff --git a/notebooks/.ipynb_checkpoints/iceberg-checkpoint.ipynb b/notebooks/.ipynb_checkpoints/iceberg-checkpoint.ipynb new file mode 100644 index 0000000..2452242 --- /dev/null +++ b/notebooks/.ipynb_checkpoints/iceberg-checkpoint.ipynb @@ -0,0 +1,103 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "29f8d24e-e4bf-484d-afd4-cb82ff6cd50d", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "\n", + "SHOW DATABASES" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "70349765-e5f1-43a5-a141-cc2d54c69a58", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "\n", + "SHOW TABLES FROM orders" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fabaed9c-9049-4996-9d26-b20f66303911", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "\n", + "SHOW TBLPROPERTIES orders.payments" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6317d9c6-140e-4a63-890e-2173fbb9503e", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "\n", + "SELECT COUNT(*)\n", + "FROM orders.payments" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2a1ff132-dc65-4943-a9be-416ba5a13c26", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "\n", + "SELECT *\n", + "FROM orders.payments\n", + "LIMIT 10" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a2688a95-594c-45ad-9d49-70a1bcd59a1b", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "\n", + "SELECT * \n", + "FROM orders.payments.partitions\n", + "ORDER BY record_count DESC\n", + "LIMIT 10" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.17" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/notebooks/iceberg.ipynb b/notebooks/iceberg.ipynb new file mode 100644 index 0000000..2452242 --- /dev/null +++ b/notebooks/iceberg.ipynb @@ -0,0 +1,103 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "29f8d24e-e4bf-484d-afd4-cb82ff6cd50d", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "\n", + "SHOW DATABASES" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "70349765-e5f1-43a5-a141-cc2d54c69a58", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "\n", + "SHOW TABLES FROM orders" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fabaed9c-9049-4996-9d26-b20f66303911", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "\n", + "SHOW TBLPROPERTIES orders.payments" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6317d9c6-140e-4a63-890e-2173fbb9503e", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "\n", + "SELECT COUNT(*)\n", + "FROM orders.payments" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2a1ff132-dc65-4943-a9be-416ba5a13c26", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "\n", + "SELECT *\n", + "FROM orders.payments\n", + "LIMIT 10" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a2688a95-594c-45ad-9d49-70a1bcd59a1b", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "\n", + "SELECT * \n", + "FROM orders.payments.partitions\n", + "ORDER BY record_count DESC\n", + "LIMIT 10" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.17" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/renovate.json b/renovate.json new file mode 100644 index 0000000..7190a60 --- /dev/null +++ b/renovate.json @@ -0,0 +1,3 @@ +{ + "$schema": "https://docs.renovatebot.com/renovate-schema.json" +} diff --git a/spark/create_table.py b/spark/create_table.py new file mode 100644 index 0000000..9252574 --- /dev/null +++ b/spark/create_table.py @@ -0,0 +1,20 @@ +from pyspark.sql import SparkSession + +spark = SparkSession.builder.appName("").getOrCreate() + +print("creating database") +spark.sql('CREATE DATABASE IF NOT EXISTS orders') + +print("creating table") +spark.sql(''' + CREATE TABLE IF NOT EXISTS orders.payments ( + id STRING, + type STRING, + created_at TIMESTAMP, + document STRING, + payer STRING, + amount INT + ) + USING iceberg + PARTITIONED BY (document) +''')