Citus Example: Ad Analytics

by citusdata


Citus Example: Ad Analytics


Example app that uses the distributed Citus database to provide a realtime ad analytics dashboard.

You can see the deployed version at

Deploying on Citus Cloud and Heroku

  1. Signup for a Citus Cloud account:
  2. Provision a new formation of servers (they are billed hourly), a small one is good for testing
  3. On the formation detail page, wait until the cluster is configured, then click Show Full URL and copy it to your clipboard
    <img src="" width=400" />
  4. Click the "Deploy to Heroku" button and enter the URL from your clipboard as DATABASE_URL:

5. Run `heroku run:detached rake test_data:load_bulk` to load sample data - this will take a while

After starting the data load task visit the app to see an example of what you can build with Citus.

Note: If you get an error like "could not find the source blob" on Heroku deploy, just click the Deploy button again.


Schema Diagram

We're distributing only the part of our dataset that we expect to take significant amounts of space, specifically ads, clicks and impressions.

We use ad_id as the common shard key for the hash distribution, in order to have data for a specific ad colocated on one shard.

Feature Highlight: Colocated Joins

To join two large tables efficiently, it is advised that you distribute them on the same columns you used to join the tables. In this case, the Citus master knows which shards of the tables might match with shards of the other table by looking at the distribution column metadata. This allows Citus to prune away shard pairs which cannot produce matching join keys. The joins between remaining shard pairs are executed in parallel on the workers and then the results are returned to the master.

In this demo app we're showing co-located joins between the ads table and the impressions and clicks tables. One example query:

SELECT ads.campaign_id, COUNT(*)
  FROM ads
       JOIN impressions ON ( = ad_id)
 WHERE ads.campaign_id IN (1,2,3,4,5,6,7,8,9,10,11)
 GROUP BY ads.campaign_id

Another example co-located join we use is to find the data for the daily click-through-rate graph on e.g. - which uses the roll-up tables and looks like this:

           extract(epoch from AS day,
           CASE WHEN idr.count > 0 THEN COALESCE(cdr.count, 0) / idr.count::float
           ELSE NULL
           END AS ctr
      FROM ads
           JOIN impression_daily_rollups idr ON (idr.ad_id =
           JOIN click_daily_rollups cdr ON (idr.ad_id = cdr.ad_id AND =
     WHERE ads.campaign_id = 2 AND BETWEEN '2016-05-25 00:00:00 UTC' AND '2016-06-24 23:59:59 UTC'
     ORDER BY 2

Feature Highlight: Daily Rollups

This demo app also shows how to work with historic data effectively. Since our impressions/clicks data is append-only, we can make a few optimizations for all data that is older than the current day.

Specifically, we can roll-up the data into daily count values, so we avoid having to read the entire table when we want to find the total amount of clicks for a given ad or campaign.

citus=> \d impression_daily_rollups
Table "public.impression_daily_rollups"
 Column |  Type  | Modifiers
 ad_id  | uuid   | not null
 count  | bigint | not null
 date   | date   | not null
    "impression_daily_rollups_pkey" PRIMARY KEY, btree (ad_id, date)

citus=> \d click_daily_rollups
Table "public.click_daily_rollups"
 Column |  Type  | Modifiers
 ad_id  | uuid   | not null
 count  | bigint | not null
 date   | date   | not null
    "click_daily_rollups_pkey" PRIMARY KEY, btree (ad_id, date)

You can see the task that runs daily here:

Feature Highlight: Single-node transactions

With Citus you can use transactions in your code, as long as they only touch a single node. This can also be used to update multiple tables which are co-located.

In this app this is used to allow Rails' counter_cache: true and touch: true to update the parent record correctly.


irb(main):003:0> impression.destroy
DELETE FROM "impressions" WHERE "impressions"."impression_id" = 'fffff511-7012-4c5e-8431-5f97efd72926' AND "impressions"."ad_id" = '7fc94c84-f39f-4c7d-bf9e-bdbf5211a2f9'
SELECT  "ads".* FROM "ads" WHERE "ads"."id" = '7fc94c84-f39f-4c7d-bf9e-bdbf5211a2f9' LIMIT 1
UPDATE "ads" SET "impressions_count" = COALESCE("impressions_count", 0) - 1 WHERE "ads"."id" = '7fc94c84-f39f-4c7d-bf9e-bdbf5211a2f9'
UPDATE "ads" SET "updated_at" = '2016-07-22 23:52:59.667746' WHERE "ads"."id" = '7fc94c84-f39f-4c7d-bf9e-bdbf5211a2f9'

Feature Highlight: BRIN indices to find recent data

In order to also include recent data into count values that are displayed, we're using a BRIN index on impressions.seen_at and clicks.clicked_at to quickly find the recent records which are not contained in the roll-up tables yet.

You can see an example query on the campaign index and detail pages, e.g.

SELECT ad_id, COUNT(*)
         FROM ads
         JOIN clicks ON ( = ad_id)
        WHERE ads.campaign_id = 1
              AND clicked_at > now()::date
        GROUP BY ad_id

The distributed EXPLAIN output for this query shows how it uses the lossy BRIN index to find the values on the worker nodes:

                                                                             QUERY PLAN                                                                             
 Distributed Query into pg_merge_job_6969
   Executor: Real-Time
   Task Count: 16
   Tasks Shown: One of 16
   ->  Task
         Node: port=5432 dbname=citus
         ->  HashAggregate  (cost=456.49..456.50 rows=1 width=16) (actual time=0.660..0.660 rows=0 loops=1)
               Group Key: clicks.ad_id
               ->  Nested Loop  (cost=12.91..456.48 rows=2 width=16) (actual time=0.658..0.658 rows=0 loops=1)
                     Join Filter: ( = clicks.ad_id)
                     Rows Removed by Join Filter: 970
                     ->  Seq Scan on ads_102137 ads  (cost=0.00..1.64 rows=1 width=16) (actual time=0.009..0.013 rows=1 loops=1)
                           Filter: (campaign_id = 6)
                           Rows Removed by Filter: 50
                     ->  Bitmap Heap Scan on clicks_102169 clicks  (cost=12.91..453.39 rows=116 width=16) (actual time=0.100..0.459 rows=970 loops=1)
                           Recheck Cond: (clicked_at > (now())::date)
                           Rows Removed by Index Recheck: 48
                           Heap Blocks: lossy=19
                           ->  Bitmap Index Scan on clicks_clicked_at_brin_102169  (cost=0.00..12.88 rows=116 width=0) (actual time=0.083..0.083 rows=1280 loops=1)
                                 Index Cond: (clicked_at > (now())::date)
             Planning time: 0.484 ms
             Execution time: 0.753 ms
 Master Query
   ->  HashAggregate  (cost=0.00..0.15 rows=10 width=0) (actual time=0.001..0.001 rows=0 loops=1)
         Group Key: intermediate_column_6969_0
         ->  Seq Scan on pg_merge_job_6969  (cost=0.00..0.00 rows=0 width=0) (actual time=0.001..0.001 rows=0 loops=1)
 Planning time: 7.291 ms
(28 rows)


Copyright (c) 2016, Citus Data Inc

Licensed under the MIT license - feel free to incorporate the code in your own projects!