At Automattic, we have a lot of data from WordPress.com, our flagship product. We have over 90 million users, and 100 million blogs. Our data team is constantly analyzing our data to discover how we can better serve our users. In 2015, one of our big focuses has been to improve the new user experience. As part of this we have been doing funnel analyses for our signup process. That is, for every person who starts our signup process, what percent gets to step 2, step 3, and then completes signup. To create these funnels we have to keep track of these events. We’re doing this using our Hadoop cluster, with a setup which includes Kafka, Flume, and Hive. We have had this store of events for some time now. More recently, we are also planning to add a separate store of user properties, which will allow us to gain further insight into what our users are doing, especially with regards to user retention, another one of our focuses for 2015. That is, once a user has signed up, how do we keep them coming back?
We have been exploring a number of different technologies for this user property store. One early experiment we did was with HBase. HBase is known to be very fast at retrieving data from very large dataset by key, and it supports a dynamic schema with wide rows, meaning that we can store lots of information per user, and add in new information arbitrarily, without having to update a schema. Unfortunately, HBase is not very performant at performing aggregations, which is one of our main use cases. We’d like to be able to query our event and user datastores together to answer questions like “How many people who signed up in October published a post in November”? It doesn’t seem like HBase will work that well, so instead we are exploring Parquet, which is also a columnar format datastore, and is known to handle nested data well.
My first exploration was to use some existing data we have, in which we sampled our MySQL database tables to get a large set of users, and all sorts of interesting information about what they have done on WordPress.com, such as publishing posts, liking posts and what not. I collected about 1.4 million of these, with the following schema:
In [225]: user_properties.printSchema() root |-- events: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- event_date: string (nullable = true) | | |-- event_data: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- name: string (nullable = true) | | |-- type: string (nullable = true) |-- geo_country_long: string (nullable = true) |-- geo_country_short: string (nullable = true) |-- jetpack_user: long (nullable = true) |-- last_seen_admin: string (nullable = true) |-- last_seen_blog: string (nullable = true) |-- last_updated: string (nullable = true) |-- sample_tags: array (nullable = true) | |-- element: string (containsNull = true) |-- subscriptions: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- subscription_data: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- subscription_status: string (nullable = true) |-- user_attributes: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) |-- user_blogs: array (nullable = true) | |-- element: string (containsNull = true) |-- user_id: long (nullable = true) |-- user_meta: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) |-- user_registration_date: string (nullable = true)
Currently we are using Hive for most of queries. We are looking to move to Spark, but in the short term, we will probably be sticking with Hive for most things. For my experiments, I have done queries both with Hive 1.1 and with PySpark 1.3.0., running on CDH 5.4.2, with a 94 node cluster with 32GB RAM per node.
One of the first things I wanted to go was to get a simple histogram of the most common events that users perform. The events is stored as an Array of Structs, with the event name being one of the fields in the struct. This is a good test case of Here are the queries I used for Hive and PySpark. For the next few analyses, I stored the data using Parquet, using 5 partitions, and ran the Spark queries with 5 executors.
Hive
select name, count(name) as count
from
(select explode(events.name) as name from
user_properties
) b
group by name
order by count desc
took about 52 seconds
PySpark
event_map = user_properties.flatMap(lambda x: [a.name for a in x.events]).countByValue()
for event in sorted(event_map.iteritems(), key=lambda x: x[1], reverse=True): print event
took about 80 seconds
Both Hive and Spark produced the same results. Spark was actually slower. I think there is still a lot of room for optimization with Spark. Here are the top 4 events:
like_out | 6468478 |
blog_subscribed | 4705225 |
follow_out | 3923466 |
comment_out | 1793021 |
As I mentioned above, Parquet is supposed to treat nested keys at top-level keys, and thus the performance aggregating on a top-level key versus a nested key should be about the same. To test this hypothesis for myself, I also made a histogram on country, which is a top-level key. It took roughly around the same amount of time as the nested key, which is good news.
Hive
SELECT geo_country_short, COUNT(geo_country_short) AS COUNT
FROM
user_properties
GROUP BY geo_country_short
ORDER BY COUNT DESC
took about 50 seconds
PySpark
country_map = user_properties.map(lambda x: x.geo_country_short).countByValue()
for country in sorted(country_map.iteritems(), key=lambda x: x[1], reverse=True): print country
took about 60 seconds
Both produced the same output. Here are the top five countries:
US | 550361 |
GB | 66734 |
FR | 62439 |
ID | 46766 |
CA | 45172 |
getting 7 day purchase by cohort
My next experiment was to get more realistic queries, using all of user info. I Sqooped in our user info of about 100 million users from our MySQL database. I used the default Sqoop settings which create one 15GB file stored as TextFileFormat.
One question we have been wanting to answer for some time is what percentage of users make a purchase within 7 days of signup up. This is a great stress test of doing some joins with our billing information and our user information.
Hive
select
from_unixtime(unix_timestamp(u.user_registered), 'yyyy-MM') as month, count(distinct(u.id)) as count
from billing_receipt_items b
inner join
user_properties u
on (u.id=b.user_id and b.`type`='new purchase')
where
datediff(b.date_only,substr(u.user_registered,1,10)) <= 7
group by from_unixtime(unix_timestamp(u.user_registered), 'yyyy-MM')
took about 1.5 minutes
pySpark
from datetime import datetime
def datediff(date1,date2):
"""Returns number of days difference between two dates. Date1 should be
before date2 normally"""
return (datetime.strptime(date2[:10], '%Y-%M-%d') - datetime.strptime(date1[:10], '%Y-%M-%d')).days
date_diff = UserDefinedFunction(lambda x,y: datediff(x,y), IntegerType())
users = sqc.createDataFrame(hive.sql("select user_registered, substr(user_registered, 1, 7) as month, id from users").rdd)
billing = sqc.createDataFrame(hive.sql("select user_id, date_only from billing_receipt_items").rdd)
joined = billing.join(users, billing.user_id==users.id)
purchase_hist=joined.filter(date_diff(joined.user_registered, joined.date_only)<=7).groupBy('month').count().collect()
Using default partitioning (about 200 partitions, of size 128 MB (our cluster block size)
took about 3 minutes
Using 50 partitions, took about 40 seconds
The above queries used the same fixed schemas as we have in our MySQL setup. My next step was to test out using a dynamic schema with Parquet. For this, I created just 2 columns - a user_id, and a Map of properties. For this part, we are going to assume that we don't have any nested structures - just simple string:string mappings.
DROP TABLE IF EXISTS `user_map`; CREATE TABLE `user_map`(
`user_id` int COMMENT '',
`props` map
ROW FORMAT SERDE
'parquet.hive.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'parquet.hive.DeprecatedParquetInputFormat'
OUTPUTFORMAT
'parquet.hive.DeprecatedParquetOutputFormat'
LOCATION
'hdfs://nameservice/user/hive/warehouse/user_map'
Performing the equivalent 7day purchase by monthly cohort for this schema took about 11 minutes with Hive, and 4 minutes with Spark, which is not that great. For Hive, the majority of the time seems to be spent in the reduce phase. As I started thinking about the schema more, and the types of queries we will commonly be performing, I started thinking more about a hybrid schema - that is, defining several commonly used static fields, and leaving the rest to be defined as part of the props Map field. In particular, it seems like one of our biggest use cases is restricting analyses by user registration date. This is also a great candidate for partitioning. I performed several more queries testing out other schemas and partitioning patterns, performing the same 7day monthly cohort query, but only for users who have signed up in 2015.
Hive
1. Fixed schema - 1 minute
2. Dynamic schema - 2.4 minutes
3. Hybrid schema with year_month as partition column - 1 minute
PySpark
1. Fixed schema - 3.5 minutes
2. Dynamic schema - 3.4 minutes
3. Hybrid schema with year_month as partition column - 1.1 minute
Cohort retention queries
Another type of query we have been running is to track the activity of users by their signup cohort. That is, we want to see how long people stick around. We created some queries to approximate this several months ago by taking advantage of the fact that our user_ids auto-increment, so we can get a rough estimate of monthly or quarterly cohorts this way. We'd like it to be more accurate though, and to be able to do more fine-grained analyses if we want. Since our current setup for this uses an Impala UDF, I thought I would try this query in Impala too, in addition to Hive and PySpark.
First
Impala
SELECT substr(cast(`date` as string), 1, 6) month,
substr(u.user_registered,1,7) as cohort,
COUNT(DISTINCT tracks.userid)
FROM default.tracks
inner join sushi_boats.users u
on cast(tracks.userid AS string) = u.id
WHERE source in ('wpcom', 'calypso')
AND eventname IN ('calypso_page_view','wpcom_admin_page_view')
AND `date` >= 20151001
AND useridtype='wpcom:user_id'
GROUP BY substr(cast(`date` as string), 1, 6), substr(u.user_registered,1,7)
HAVING cast(month as int) > 201503
Memory limit exceeded :(
Hive
SELECT substr(cast(`date` as string), 1, 6) month,
substr(u.user_registered,1,7) as cohort,
COUNT(DISTINCT tracks.userid)
FROM default.tracks
inner join sushi_boats.users u
on cast(tracks.userid AS string) = u.id
WHERE source in ('wpcom', 'calypso')
AND eventname IN ('calypso_page_view','wpcom_admin_page_view')
AND `date` >= 20151001
AND useridtype='wpcom:user_id'
GROUP BY substr(cast(`date` as string), 1, 6), substr(u.user_registered,1,7)
HAVING cast(month as int) > 201503
Took about 2.5 minutes
PySpark
users = sqc.createDataFrame(hive.sql("select user_registered, substr(user_registered, 1, 7) as cohort, id from sushi_boats.users").rdd)
tracks = sqc.createDataFrame(hive.sql("select userid, substr(`date`,1,6) as month from default.tracks WHERE source in ('wpcom', 'calypso') AND eventname IN ('calypso_page_view','wpcom_admin_page_view') AND `date` >= 20151001 AND useridtype='wpcom:user_id'").rdd)
joined = users.join(tracks, tracks.userid==str(users.id))
tracks_cohorts = joined.filter(date_diff(joined.user_registered, joined.date_only)<=7).groupBy(['month','cohort']).agg(countDistinct(joined.userid)).collect()
Took about 85 minutes
I was able to get this down to about 3 minutes by increasing the number of executors from 6 to 300
Using the user_map table (parquet)
Hive
SELECT substr(cast(`date` as string), 1, 6) month,
substr(u.props['user_registered'],1,7) as cohort,
COUNT(DISTINCT tracks.userid)
FROM default.tracks
inner join sushi_boats.user_map u
on cast(tracks.userid AS string) = u.user_id
WHERE source in ('wpcom', 'calypso')
AND eventname IN ('calypso_page_view','wpcom_admin_page_view')
AND `date` >= 20151001
AND useridtype='wpcom:user_id'
GROUP BY substr(cast(`date` as string), 1, 6), substr(u.props['user_registered'],1,7)
HAVING cast(month as int) > 201503
took about 3.5 minutes
PySpark
I tried 2 different
tracks_cohorts_map = hive.sql("SELECT substr(cast(`date` as string), 1, 6) month, substr(u.props['user_registered'],1,7) as cohort, COUNT(DISTINCT tracks.userid) FROM default.tracks inner join user_map u on cast(tracks.userid AS string) = u.user_id WHERE source in ('wpcom', 'calypso') AND eventname IN ('calypso_page_view','wpcom_admin_page_view') AND `date` >= 20151001 AND useridtype='wpcom:user_id' GROUP BY substr(cast(`date` as string), 1, 6), substr(u.props['user_registered'],1,7) HAVING cast(month as int) > 201503")
pprint(tracks_cohorts_map.collect())
200 executors: 8 minutes
500 executors: 5 minutes
750 executors: 4 minutes
users = sqc.createDataFrame(hive.sql("select props['user_registered'] as user_registered, substr(props['user_registered'], 1, 7) as cohort, cast(user_id as string) as userid from user_map cluster by userid").rdd)
tracks = sqc.createDataFrame(hive.sql("select userid, substr(`date`,1,6) as month from default.tracks WHERE source in ('wpcom', 'calypso') AND eventname IN ('calypso_page_view','wpcom_admin_page_view') AND `date` >= 20151001 AND useridtype='wpcom:user_id' cluster by userid").rdd)
joined = users.join(tracks, tracks.userid==users.userid)
tracks_cohorts = joined.groupBy('month','cohort').agg(joined.month,joined.cohort,countDistinct(users.userid).alias('count')).collect()
Took about 10 minutes
Using user_avro_map table (same schema as user_map, but stored in Avro)
Hive - 5.5 minutes (same query as above)
Using user_hybrid_map
Hive
SELECT substr(cast(`date` as string), 1, 6) month,
year_month_registered as cohort,
COUNT(DISTINCT tracks.userid)
FROM default.tracks
inner join user_hybrid_map u
on (cast(tracks.userid AS string) = u.user_id
AND source in ('wpcom', 'calypso')
AND eventname IN ('calypso_page_view','wpcom_admin_page_view')
AND `date` >= 20151001
AND useridtype='wpcom:user_id'
)
GROUP BY year_month_registered, substr(cast(`date` as string), 1, 6)
HAVING cast(month as int) > 201503
Took about 4 minutes
PySpark
For this one, I tried it 2 different ways - first just using the Spark HiveContext
tracks_cohorts_hybrid_map = hive.sql("SELECT substr(cast(`date` as string), 1, 6) month, year_month_registered as cohort, COUNT(DISTINCT tracks.userid) FROM default.tracks inner join user_hybrid_map u on cast(tracks.userid AS string) = u.user_id WHERE source in ('wpcom', 'calypso') AND eventname IN ('calypso_page_view','wpcom_admin_page_view') AND `date` >= 20151001 AND useridtype='wpcom:user_id' GROUP BY substr(cast(`date` as string), 1, 6), year_month_registered HAVING cast(month as int) > 201503")
pprint(tracks_cohorts_hybrid_map.collect())
Took about 11 minutes with 100 executors
I also did it using the DataFrame methods, which yielded the same results in a similar time
users = sqc.createDataFrame(hive.sql("select year_month_registered as cohort, cast(user_id as string) as userid from user_hybrid_map cluster by userid").rdd)
tracks = sqc.createDataFrame(hive.sql("select userid, substr(`date`,1,6) as month from default.tracks WHERE source in ('wpcom', 'calypso') AND eventname IN ('calypso_page_view','wpcom_admin_page_view') AND `date` >= 20151001 AND useridtype='wpcom:user_id' cluster by userid").rdd)
joined = users.join(tracks, tracks.userid==users.userid)
tracks_cohorts = joined.groupBy('month','cohort').agg(joined.month,joined.cohort,countDistinct(users.userid).alias('count'))
pprint(tracks_cohorts.collect())
Took about 12 minutes with 100 executors
In theory, it should not be necessary to use the createDataFrame methods here, since the hive.sql method returns a DataFrame, but when I tried doing that, I was getting weird errors like "user_id is not a known column in (user_id, user_registered)". Also note that the cluster by greatly reduced the number of tasks necessary in my join, since it was known which splits to compare to which.
Does this sound interesting? We're hiring