Druid Cookbook

In an earlier blog, I demonstrated a technique to combine existing and new data in Druid batch ingestion in a way that more or less emulates what is usually expressed in SQL as a MERGE or UPSERT statement. This technique involves a combine datasource and works only in JSON-based ingestion. Also, it works on bulk data where you replace an entire range of data based on a time interval and key range.

Today I am going to look at a similar, albeit more surgical construction, implementing what is usually expressed in SQL as a MERGE or UPSERT statement. I will be using SQL based ingestion that is available in newer versions of Druid.

The MERGE statement, in a simplified way, works like this:

MERGE INTO druid_table
(SELECT * FROM external_table)
ON druid_table.keys = external_table.keys
WHEN MATCHED THEN UPDATE ...
WHEN NOT MATCHED THEN INSERT ...

So, you compare old (druid_table) and new data (external_table) with respect to a matching condition. This would entail a combination of timestamp and key fields, which in the above pseudocode is denoted by keys. There are three possible outcomes for any combination of keys:

  1. If keys exists only in druid table, leave these data untouched.
  2. If keys exists in both tables, replace the row(s) in druid_table with those in external_table.
  3. If keys exists only in external_table, insert that data into druid_table.

But Druid SQL does not offer a MERGE statement, at least not at the time of this writing. Can we do this in SQL anyway? Stay tuned if you want to know!

This tutorial works with the Druid 28 quickstart.

Recap: the data

Let’s use the same data as in the bulk upsert blog: daily aggregated viewership data from various ad networks.

{"date": "2023-01-01T00:00:00Z", "ad_network": "gaagle", "ads_impressions": 2770, "ads_revenue": 330.69}
{"date": "2023-01-01T00:00:00Z", "ad_network": "fakebook", "ads_impressions": 9646, "ads_revenue": 137.85}
{"date": "2023-01-01T00:00:00Z", "ad_network": "twottr", "ads_impressions": 1139, "ads_revenue": 493.73}
{"date": "2023-01-02T00:00:00Z", "ad_network": "gaagle", "ads_impressions": 9066, "ads_revenue": 368.66}
{"date": "2023-01-02T00:00:00Z", "ad_network": "fakebook", "ads_impressions": 4426, "ads_revenue": 170.96}
{"date": "2023-01-02T00:00:00Z", "ad_network": "twottr", "ads_impressions": 9110, "ads_revenue": 452.2}
{"date": "2023-01-03T00:00:00Z", "ad_network": "gaagle", "ads_impressions": 3275, "ads_revenue": 363.53}
{"date": "2023-01-03T00:00:00Z", "ad_network": "fakebook", "ads_impressions": 9494, "ads_revenue": 426.37}
{"date": "2023-01-03T00:00:00Z", "ad_network": "twottr", "ads_impressions": 4325, "ads_revenue": 107.44}
{"date": "2023-01-04T00:00:00Z", "ad_network": "gaagle", "ads_impressions": 8816, "ads_revenue": 311.53}
{"date": "2023-01-04T00:00:00Z", "ad_network": "fakebook", "ads_impressions": 8955, "ads_revenue": 254.5}
{"date": "2023-01-04T00:00:00Z", "ad_network": "twottr", "ads_impressions": 6905, "ads_revenue": 211.74}
{"date": "2023-01-05T00:00:00Z", "ad_network": "gaagle", "ads_impressions": 3075, "ads_revenue": 382.41}
{"date": "2023-01-05T00:00:00Z", "ad_network": "fakebook", "ads_impressions": 4870, "ads_revenue": 205.84}
{"date": "2023-01-05T00:00:00Z", "ad_network": "twottr", "ads_impressions": 1418, "ads_revenue": 282.21}
{"date": "2023-01-06T00:00:00Z", "ad_network": "gaagle", "ads_impressions": 7413, "ads_revenue": 322.43}
{"date": "2023-01-06T00:00:00Z", "ad_network": "fakebook", "ads_impressions": 1251, "ads_revenue": 265.52}
{"date": "2023-01-06T00:00:00Z", "ad_network": "twottr", "ads_impressions": 8055, "ads_revenue": 394.56}
{"date": "2023-01-07T00:00:00Z", "ad_network": "gaagle", "ads_impressions": 4279, "ads_revenue": 317.84}
{"date": "2023-01-07T00:00:00Z", "ad_network": "fakebook", "ads_impressions": 5848, "ads_revenue": 162.96}
{"date": "2023-01-07T00:00:00Z", "ad_network": "twottr", "ads_impressions": 9449, "ads_revenue": 379.21}

Save this file as data1.json. Also, save the “new data” bit:

{"date": "2023-01-03T00:00:00Z", "ad_network": "gaagle", "ads_impressions": 4521, "ads_revenue": 378.65}
{"date": "2023-01-04T00:00:00Z", "ad_network": "gaagle", "ads_impressions": 4330, "ads_revenue": 464.02}
{"date": "2023-01-05T00:00:00Z", "ad_network": "gaagle", "ads_impressions": 6088, "ads_revenue": 320.57}
{"date": "2023-01-06T00:00:00Z", "ad_network": "gaagle", "ads_impressions": 3417, "ads_revenue": 162.77}
{"date": "2023-01-07T00:00:00Z", "ad_network": "gaagle", "ads_impressions": 9762, "ads_revenue": 76.27}
{"date": "2023-01-08T00:00:00Z", "ad_network": "gaagle", "ads_impressions": 1484, "ads_revenue": 188.17}
{"date": "2023-01-09T00:00:00Z", "ad_network": "gaagle", "ads_impressions": 1845, "ads_revenue": 287.5}

as data2.json.

Initial data ingestion

Let’s ingest the first data set. We want to set the segment granularity to month, so the ingestion statement uses a PARTITIONED BY MONTH clause. Moreover, we enforce secondary partitioning by choosing REPLACE mode and by including a CLUSTERED BY clause. Here’s the complete statement (replace the path in baseDir with the path you saved the sample file to):

REPLACE INTO "ad_data" OVERWRITE ALL
WITH "ext" AS (
  SELECT *
  FROM TABLE(
    EXTERN(
      '{"type":"local","baseDir":"/<my base path>","filter":"data1.json"}',
      '{"type":"json"}'
    )
  ) EXTEND ("date" VARCHAR, "ad_network" VARCHAR, "ads_impressions" BIGINT, "ads_revenue" DOUBLE)
)
SELECT
  TIME_PARSE("date") AS "__time",
  "ad_network",
  "ads_impressions",
  "ads_revenue"
FROM "ext"
PARTITIONED BY MONTH
CLUSTERED BY "ad_network"

You can run this SQL from the Query tab in the Druid console:

Console running initial ingestion

Or you can use the Ingest wizard to enter the same code.

The merge query

Many thanks to John Kowtko for pointing out this approach. Since we don’t have a MERGE statement, let’s emulate it using a FULL OUTER JOIN. Druid’s MSQ engine supports sort/merge joins of arbitrary size tables, so we can actually pull this off!

Important note: the new join algorithm needs to be explicitly requested by setting a query context parameter. Open up the query engine menu next to the Preview button, and select Edit context:

Second ingestion with context

Add { "sqlJoinAlgorithm": "sortMerge" } to the query context.

Then run the ingestion query:

REPLACE INTO "ad_data" OVERWRITE ALL
WITH "ext" AS (
  SELECT *
  FROM TABLE(
    EXTERN(
      '{"type":"local","baseDir":"/<my base path>","filter":"data2.json"}',
      '{"type":"json"}'
    )
  ) EXTEND ("date" VARCHAR, "ad_network" VARCHAR, "ads_impressions" BIGINT, "ads_revenue" DOUBLE)
)
SELECT
  COALESCE("new_data"."__time", "ad_data"."__time") AS "__time",
  COALESCE("new_data"."ad_network", "ad_data"."ad_network") AS "ad_network",
  CASE WHEN "new_data"."ad_network" IS NOT NULL THEN "new_data"."ads_impressions" ELSE "ad_data"."ads_impressions" END AS "ads_impressions",
  CASE WHEN "new_data"."ad_network" IS NOT NULL THEN "new_data"."ads_revenue" ELSE "ad_data"."ads_revenue" END AS "ads_revenue"
FROM
  "ad_data"
FULL OUTER JOIN
  ( SELECT
    TIME_PARSE("date") AS "__time",
    "ad_network",
    "ads_impressions",
    "ads_revenue"
  FROM "ext" ) "new_data"
ON "ad_data"."__time" = "new_data"."__time" AND "ad_data"."ad_network" = "new_data"."ad_network"
PARTITIONED BY MONTH
CLUSTERED BY "ad_network"

Analysis of the query

What have we done here?

We are emulating the MERGE statement with a full outer join. The left side table is the data we already have in Druid; the right side is the new data. Our merge key is a combination of timestamp (daily granularity) and ad network.

For each key combination there are three possible outcomes:

  1. If the right hand side is null, leave the left hand side data as the result (leave old data untouched).
  2. If neither side is null, replace the row(s) in the existing table with new data from the right hand side (update rows).
  3. If the left hand side is null, insert the right hand side data into Druid.

This is exactly what we wanted to happen.

In order to identify the correct data to be inserted, we look at the join key:

  • Data rows that refer to key fields are modeled with a COALESCE statement: COALESCE("new_data"."ad_network", "ad_data"."ad_network") AS "ad_network" selects the key field from the right hand side, and if that one is null (right hand side doesn’t exist), then the left hand side instead.
  • For non-key fields the statement is a bit more complex because we still have to select based on the key field. Otherwise some real null values in the data might create inconsistencies, where we would overwrite rows only partially. Hence an expression like CASE WHEN "new_data"."ad_network" IS NOT NULL THEN "new_data"."ads_impressions" ELSE "ad_data"."ads_impressions" END AS "ads_impressions".

Can we be more selective?

You might be thinking that this approach entails rewriting all the data in the existing table, even if the range of new data is much more limited. And you would be right. Fortunately, it is possible to limit the date range to be overwritten.

Let’s try this. Apparently we can specify the date range like so:

REPLACE INTO "ad_data" OVERWRITE WHERE __time >= TIMESTAMP'2023-01-03' AND __time < TIMESTAMP'2023-01-10'
...

Alas, this doesn’t work:

Granularity error

The date filter has to be aligned with the segments, otherwise Druid will refuse to run the query. This is actually a Good Thing: in JSON ingestion mode you would be able to overwrite a whole segment with data covering a lesser date range, potentially deleting data that you actually wanted to keep!

If we adjust the date range clause to match the segment boundaries:

REPLACE INTO "ad_data" OVERWRITE WHERE __time >= TIMESTAMP'2023-01-01' AND __time < TIMESTAMP'2023-02-01'
...

the ingestion query works fine and we get the desired result:

Query

Use the new graphical exploration mode of Druid to get an idea of the data:

Explore

Learnings

  • You can emulate the effect of a MERGE statement in Druid with a full outer join.
  • Make sure to enable the sort/merge join algorithm in the query context.
  • Some consideration must be taken around null values in the outer join result.
  • You can limit the range of data for reprocessing using OVERWRITE WHERE ..., but take care to align the time filter with your segment granularity.

This image is taken from Page 500 of Praktisches Kochbuch für die gewöhnliche und feinere Küche” by Medical Heritage Library, Inc. is licensed under CC BY-NC-SA 2.0 .