Snowflake to Postgres ETL

There are a bunch of guides on how to ETL data from Postgres to Snowflake that already exist, but I haven't seen a guide to ETL in the opposite direction. Here is a quick post on how to ETL data from Snowflake to Postgres.

For this post, we will assume you are using AWS, but the code will be very similar if you are using GCP or Azure.

Overview

Our approach is pretty straightforward, we’ll export data as a CSV from snowflake to S3 and then import the CSV into Postgres.

Unload Data from Snowflake to S3

COPY INTO STAGE/file.csv.gz FROM (SELECT * FROM source_table) ENCRYPTION = (TYPE='AWS_SSE_S3') FILE_FORMAT = ( FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = '"' COMPRESSION = GZIP BINARY_FORMAT = UTF8 NULL_IF = () ) SINGLE = TRUE HEADER = TRUE OVERWRITE = TRUE MAX_FILE_SIZE = 1000000000 ;

For information on unloading into block storage GCP or azure, you can read this guide.

Unload Data from S3 to Postgres

We'll use the aws_s3 Postgres extension to unload the s3 CSV into Postgres directly. Alternatively, you can download the CSV file to your application server, and send it to Postgres using a COPY INTO statement but the code will be quite similar. Either way, you should have a good understanding of the COPY INTO command because the aws_s3 extension uses it under the hood.

Now there are two patterns we can use for importing these records

  1. UPSERT into an existing table

  2. Do a full table replace of an existing table

Upsert

BEGIN TRANSACTION;

-- Create a copy of the source table by schema, with no data
CREATE TEMPORARY TABLE temp_table AS
TABLE public.destination_table
WITH NO DATA;

-- Import data from S3 into the temp table          
SELECT aws_s3.table_import_from_s3(
  'temp_table',                -- the table to import into 
  'ID,COLUMN1,COLUMN2',        -- The columns to be imported
  '(FORMAT csv, HEADER true)', -- params you can pass to COPY INTO
  's3_bucket',                 -- the S3 bucket location
  's3_file',                   -- the s3 file name
  'us-east-1'                  -- region
);

-- Upsert data from temp table into the destination table
INSERT into public.destination_table (ID, COLUMN1, COLUMN2)
SELECT ID, COLUMN1, COLUMN2 from temp_table
WHERE TRUE
ON CONFLICT ID
DO UPDATE SET COLUMN1 = excluded.COLUMN1, COLUMN2 = excluded.COLUMN2;


COMMIT TRANSACTION;

Table Replace

BEGIN TRANSACTION; -- Create a copy of the destination table with no data -- Note that this does not copy indexes or constraints CREATE TABLE public.destination_table_temp AS TABLE public.destination_table WITH NO DATA; -- Import data into copied table from S3 SELECT aws_s3.table_import_from_s3( 'public.temp_table', 'ID,COLUMN1,COLUMN2', '(FORMAT csv, HEADER true)', 's3_bucket', 's3_file', 'us-east-1' ) ; -- Rename tables and drop the old table ALTER TABLE public.destination_table RENAME TO destination_table_old; ALTER TABLE public.destination_table_temp RENAME TO destination_table; DROP TABLE public.destination_table_old; -- Add primary key, constraints and indexes on new table ALTER TABLE IF EXISTS ONLY public.temp_table ADD CONSTRAINT destination_table_pkey PRIMARY KEY (ID); CREATE INDEX index_destination_table_on_id ON public.destination_table USING btree COLUMN1; COMMIT TRANSACTION;

One thing to note here is that we are not copying indexes or constraints until after the import. We are purposely doing this to reduce the amount of IO required for the import.

Tips

  • Be aware of how types in snowflake convert to Postgres types. Prevent downstream errors by coercing columns to more compatible data types when unloading from snowflake.

  • When exporting from Snowflake to S3, make sure you are parsing NULL values correctly. By default Snowflake will export NULL as '\\N' into the CSV, but Postgres's default behavior is to interpret '' as NULL. In our example, we handle this case by setting the NULL_IF() option under FILE_FORMAT when exporting data from snowflake into S3. This tells snowflake to export NULL values as '' instead, which aligns with Postgres's default behavior.

  • Use the correct file encodings for your use case. In the example, we used UTF-8 encoding but you may prefer another type. You can read more on this here

Previous
Previous

Data Modeling for Streams

Next
Next

I have an index and my query is still slow