VOOZH about

URL: https://dzone.com/articles/snowflake-ingestion-using-cockroachdb-and-redpanda

โ‡ฑ Snowflake Ingestion Using CockroachDB and Redpanda


Related

  1. DZone
  2. Data Engineering
  3. Data
  4. Tour of Snowflake Ingestion Using CockroachDB and Redpanda Connect

Tour of Snowflake Ingestion Using CockroachDB and Redpanda Connect

This is an exploratory article delving into various ways to ingest real-time data into Snowflake and the levels of difficulty and effort required.

By Aug. 15, 24 ยท Tutorial
Likes
Comment
Save
5.7K Views

Join the DZone community and get the full member experience.

Join For Free

Previous Articles on Snowflake

Previous Articles on CockroachDB CDC


Motivation

I work with financial services clients, and it's common to encounter a need for streaming changes in the operational data store into a data warehouse or a data lake. A former colleague recently reached out for advice on the fastest and most efficient way to load trade data into Snowflake. I've come up with at least three methods, which I will explore in a follow-up series of articles. However, I've decided to first explore Redpanda Connect, a solution that has recently caught my attention. This is by no means a conclusive guide on how changefeed data must be loaded into Snowflake; we're merely exploring the possibilities and discussing the pros and cons in later articles.

CockroachDB changefeeds are an enterprise feature and require a license. In this tutorial, I'm using a free-to-start version of CockroachDB Serverless, which has enterprise changefeeds enabled.

High-Level Steps

  • Deploy a CockroachDB cluster with enterprise changefeeds
  • Deploy Redpanda Connect
  • Deploy Snowflake
  • Verify
  • Conclusion

Step-By-Step Instructions

Deploy a CockroachDB Cluster With Enterprise Changefeeds

Start an instance of CockroachDB or use the managed service.

To enable CDC we need to execute the following commands:

SET CLUSTER SETTING cluster.organization = '<organization name>';

SET CLUSTER SETTING enterprise.license = '<secret>';

SET CLUSTER SETTING kv.rangefeed.enabled = true;


I am using CockroachDB Serverless and the above steps are not necessary. You may confirm whether the changefeeds are indeed enabled using the following command:

SHOW CLUSTER SETTING kv.rangefeed.enabled;


If the value is false, change it to true.

Generate sample data:

CREATE TABLE office_dogs (
 id INT PRIMARY KEY,
 name STRING);

INSERT INTO office_dogs VALUES
 (1, 'Petee'),
 (2, 'Carl');

UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;


We've populated the table and then updated a record. Let's add more data to make it interesting:

INSERT INTO office_dogs SELECT generate_series(3, 10000), md5(random()::string);
SELECT * FROM office_dogs LIMIT 5;
id,name
1,Petee H
2,Carl
3,6e19280ae649efffa7a58584c7f46032
4,5e4e897f008bb752c8edfa64a3aed356
5,abc0d898318d27f23a43060f89d62e34
SELECT COUNT(*) FROM office_dogs;


Deploy Redpanda Connect

I'm running Redpanda Connect in a Docker Compose file.

docker compose -f compose-redpanda.yaml up -d


The contents of the file are:

services:

 redpanda:
 container_name: redpanda-connect
 hostname: redpanda-connect
 image: docker.redpanda.com/redpandadata/connect
 volumes:
 - ./redpanda/connect.yaml:/connect.yaml
 - /Users/aervits/.ssh/rsa_key.pem:/rsa_key.pem


I will be using the connect.yaml file as the foundation to connect all the components in this article. For more detailed information, you can refer to the documentation provided by Redpanda.

The most basic configuration looks like so:

input:
 stdin: {}

pipeline:
 processors: []

output:
 stdout: {}


Since I'm using CockroachDB input, mine looks like this:

input:
 # CockroachDB Input
 label: ""
 cockroachdb_changefeed:
 dsn: postgresql://<user>:<password>@<cockroachdb-cluster>:<port>/<database>?sslmode=verify-full
 tls:
 skip_cert_verify: true
 #enable_renegotiation: false
 #root_cas: ""
 #root_cas_file: ""
 client_certs: []
 tables: [table_for_cdc] # No default (required)
 cursor_cache: "" # No default (optional)
 auto_replay_nacks: true

pipeline:
 processors: []

output:
 stdout: {}


Leave the pipeline and output as default.

For reference, I'm including the repo with my source code where you can reference the values.

If you have been following along, you may have noticed that I haven't started a changefeed job in CockroachDB. The cockroachdb_changefeed input directly subscribes to the table, which can be observed by examining the logs using the command docker logs redpanda-connect --follow. If you look at the connect.yaml file, the output is sent to stdout:

{"primary_key":"[9998]","row":"{\"after\": {\"id\": 9998, \"name\": \"0794a9d1c99e8e47ee4515be6e0d736f\"}}","table":"office_dogs"}
{"primary_key":"[9999]","row":"{\"after\": {\"id\": 9999, \"name\": \"c85a6b38154f7e3085d467d567141d45\"}}","table":"office_dogs"}
{"primary_key":"[10000]","row":"{\"after\": {\"id\": 10000, \"name\": \"aae9e0849fff8f47e0371a4c06fb255b\"}}","table":"office_dogs"}


The next step is to configure Snowflake. We are not going to look at the available processors today.

Deploy Snowflake

I'm using a Snowflake trial account. You get a generous credit which should be sufficient to complete this tutorial.

We need to create a database and a table where we will output the changefeed data.

CREATE OR REPLACE DATABASE FROM_COCKROACH;
CREATE OR REPLACE TABLE OFFICE_DOGS (RECORD variant);


We also need to create a user with key-pair authentication as we're going to be using the Snowpipe service.

openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8


We must use an encrypted key as Redpanda doesn't support unencrypted versions.

Generate a public key:

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub 


Lastly, generate a pem file from the private key:

openssl pkcs8 -in rsa_key.p8 -out rsa_key.pem


In Snowflake, alter the user to use the key pair generated in the previous step.

ALTER USER username SET rsa_public_key='MIIB...';


We can now populate the connect.yaml file with the required information for the snowflake_put output. This output type is for commercial use and requires a license, but since we're using it for demo purposes, we are able to proceed.

output:
 # Snowflake Output
 label: ""
 snowflake_put:
 account: <snowflake-account>
 user: <user>
 private_key_file: rsa_key.pem
 role: ACCOUNTADMIN
 database: <database>
 warehouse: <warehouse>
 schema: <schema>
 stage: "@%implicit_table_stage_name"
 path: "path"
 upload_parallel_threads: 4
 compression: NONE
 batching:
 count: 10
 period: 3s
 processors:
 - archive:
 format: json_array
 max_in_flight: 1


If we restart the compose environment and tail the logs, we can see the following:

level=info msg="Running main config from specified file" @service=benthos benthos_version=v4.32.1 path=/connect.yaml
level=info msg="Listening for HTTP requests at: http://0.0.0.0:4195" @service=benthos
level=info msg="Launching a Redpanda Connect instance, use CTRL+C to close" @service=benthos
level=info msg="Output type snowflake_put is now active" @service=benthos label="" path=root.output
level=info msg="Input type cockroachdb_changefeed is now active" @service=benthos label="" path=root.input


Let's look at the implicit table stage and observe if anything has changed.

list @%office_dogs
| dogs/f2f3cf47-d6bc-46f4-88f2-c82519b67481.json | 1312 | 30f709e4962bae9d10b48565d22e9f32 | Wed, 14 Aug 2024 18:58:43 GMT |
| dogs/f6adbf39-3955-4848-93c3-06f873a88078.json | 1312 | 28be7a619ef1e139599077e977ea130b | Wed, 14 Aug 2024 18:58:13 GMT |
| dogs/f8705606-eb07-400a-9ffe-da6834fa1a30.json | 1296 | 5afbdce0e8929fc38a2eb5e0f12b96d6 | Wed, 14 Aug 2024 18:57:29 GMT |
| dogs/f9e5c01a-7dda-4e76-840d-13b8a1e4946a.json | 1296 | 5480c01f1578f67afe2761c7619e9123 | Wed, 14 Aug 2024 18:57:32 GMT |
| dogs/fad4efe7-3f3f-48bc-bdb4-9f0310abcf4d.json | 1312 | 5942c6e2dbaef5ee257d4a9b8e68827d | Wed, 14 Aug 2024 18:58:04 GMT |


The files are ready to be copied into a table. Let's create a pipe:

CREATE OR REPLACE PIPE FROM_COCKROACH.PUBLIC.cockroach_pipe AUTO_INGEST = FALSE AS COPY INTO FROM_COCKROACH.PUBLIC.OFFICE_DOGS FROM (SELECT * FROM @%office_dogs) FILE_FORMAT = (TYPE = JSON COMPRESSION = AUTO STRIP_OUTER_ARRAY = TRUE);


The last remaining step is to refresh the pipe.

ALTER PIPE cockroach_pipe REFRESH;
| dogs/ff0871b1-6f49-43a4-a929-958d07f74046.json | SENT |
| dogs/ff131d8d-3781-4cf6-8700-edd50dbb87de.json | SENT |
| dogs/ff216da1-4f9d-4b37-9776-bcd559dd4a6f.json | SENT |
| dogs/ff221430-4c3a-46be-bbc2-d335cc6cc9e3.json | SENT |
| dogs/ffbd7d45-5084-4e36-8907-61874ac652b4.json | SENT |
| dogs/fffb5fa6-23cc-4450-934a-29ccf01c67b9.json | SENT |


Let's query the table in Snowflake:

SELECT * FROM OFFICE_DOGS LIMIT 5;
| { |
| "primary_key": "[5241]", |
| "row": "{\"after\": {\"id\": 5241, \"name\": \"5e0360a0d10d849afbbfa319a50bccf2\"}}", |
| "table": "office_dogs" |
| } |
| { |
| "primary_key": "[5242]", |
| "row": "{\"after\": {\"id\": 5242, \"name\": \"62be250249afe74bfbc5dd356e7b0ad9\"}}", |
| "table": "office_dogs" |
| } |
| { |
| "primary_key": "[5243]", |
| "row": "{\"after\": {\"id\": 5243, \"name\": \"7f286800a8a03e74938d09fdba52f869\"}}", |
| "table": "office_dogs" |
| } |
| { |
| "primary_key": "[5244]", |
| "row": "{\"after\": {\"id\": 5244, \"name\": \"16a330b8f09bcd314f9760ffe26d0ae2\"}}", |
| "table": "office_dogs" |
| }


We expect 10000 rows:

SELECT COUNT(*) FROM OFFICE_DOGS;
+----------+ 
| COUNT(*) |
|----------|
| 10000 |
+----------+


The data is in JSON format. Let's create a view and flatten the data out.

CREATE VIEW v_office_dogs AS
 SELECT PARSE_JSON(record:row):after:id::INTEGER AS id,
 PARSE_JSON(record:row):after:name::STRING AS name FROM OFFICE_DOGS;


Query the view:

SELECT * FROM v_office_dogs WHERE id < 6;
+----+----------------------------------+ 
| ID | NAME |
|----+----------------------------------|
| 1 | Petee H |
| 2 | Carl |
| 3 | 6e19280ae649efffa7a58584c7f46032 |
| 4 | 5e4e897f008bb752c8edfa64a3aed356 |
| 5 | abc0d898318d27f23a43060f89d62e34 |
+----+----------------------------------+


Verify

Let's make things a bit more interesting and delete data in CockroachDB.

DELETE FROM office_dogs WHERE name = 'Carl';
DELETE FROM office_dogs WHERE id = 1;


In Snowflake, let's refresh the pipe as of a few minutes ago:

ALTER PIPE cockroach_pipe REFRESH MODIFIED_AFTER='2024-08-14T12:10:00-07:00';


Notice there are a couple of files.

+------------------------------------------------+--------+ 
| File | Status |
|------------------------------------------------+--------|
| dogs/2a4ee400-6b37-4513-97cb-097764a340bc.json | SENT |
| dogs/8f5b5b69-8a00-4dbf-979a-60c3814d96b4.json | SENT |
+------------------------------------------------+--------+


I must caution that if you run the REFRESH manually, you may cause duplicates in your Snowflake data. We will look at better approaches in a future article.

Let's look at the row count:

+----------+ 
| COUNT(*) |
|----------|
| 10002 |
+----------+


The removal process didn't properly update in Snowflake as anticipated; it recognized the deleted records but failed to mirror the state in CockroachDB. We need to incorporate additional logic to achieve this. This will be a task for another time.

Lastly, I would like to note that using Redpanda Connect as a compose file is optional. You have the option to run the Docker container by executing the following command:

docker run --rm -it -v ./redpanda/connect.yaml:/connect.yaml -v ./snowflake/rsa_key.pem:/rsa_key.pem docker.redpanda.com/redpandadata/connect run

Conclusion

Today, we explored Redpanda Connect as a means to deliver streaming changefeeds into Snowflake. We've only just begun to delve into this topic, and future articles will build upon the foundations laid today.

CockroachDB Database JSON Data (computing) Data Types

Published at DZone with permission of Artem Ervits. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • JSON-Based Serialized LOB Pattern
  • Five Nonprofit & Charity APIs That Make Due Diligence Way Less Painful for Developers
  • Modify JSON Data in Postgres and Hibernate 6
  • Practical Generators in Go 1.23 for Database Pagination

Partner Resources

ร—

Comments

The likes didn't load as expected. Please refresh the page and try again.

Let's be friends: