Greenplum Database is an interesting solution for data mining and data warehousing. In this post I focus on MapReduce capabilities of Greenplum 4.1 and try to figure out how efficient its implementation is.
Simple MapReduce Job
Let us consider a simplified version of one real life problem that is typically solved using MapReduce technique – analysis of Internet traffic. Imagine that we have a table of events and a new event is recorded each time some user visits some URL:
table IMPRESSION (id integer primary key, user_id integer, url text, time_stamp timestamp)
Let’s our goal is to calculate the total number of visits for each domain like wordpress.com. First, we need to strip each URL to figure out a domain name. Next, we can emit a domain_name as a key during the map phase and count a number of the emitted records for each domain during the reduce phase. Greenplum MapReduce job specification will be as follows:
%YAML 1.1
---
VERSION: 1.0.0.1
DATABASE: gptest
USER: gpadmin
DEFINE:
- INPUT:
NAME: impression_urls
QUERY: select url from IMPRESSION
- MAP:
NAME: url_py_splitter
LANGUAGE: PYTHON
PARAMETERS: [url]
RETURNS: [key text, value integer]
OPTIMIZE: STRICT IMMUTABLE
MODE: MULTI
FUNCTION: |
from urlparse import urlparse
loc = urlparse(url).netloc
yield {'key': loc, 'value': 1}
EXECUTE:
- RUN:
SOURCE: impression_urls
MAP: url_py_splitter
REDUCE: COUNT
Section EXECUTE/RUN declares that data specified in the DEFINE/INPUT section (as the output of SQL query) should be processes by the mapper declared in DEFINE/MAP section and then reduced by the standard COUNT reducer that, not surprisingly, simply counts a number of records emitted by the mapper. Now we can submit this description to gpmapreduce utility and obtain the following result (small artificially generated data set was used):
[gpadmin@gp-single-host src]$ gpmapreduce -f url_parsing.mapreduce mapreduce_20586_run_1 key |value -----------------+----- en.wikipedia.org | 395 www.google.com | 497 www.greenplum.com| 108 ...
We can make several simple observations regarding MapReduce implementation altering this job and running it on the large data sets:
- Greenplum is able to spill temporary result to disk: if one changes yield {‘key’: loc, ‘value’: 1 } to yield {‘key’: loc + str(random.random()), ‘value’: 1} then each Segment server starts to use disk intensively because the total number of keys is high.
- Greenplum is able to group data on Segment’s side and reduce Mapper’s output size. For example, replacement of yield {‘key’: loc, ‘value’: 1 } by yield {‘key’: loc, ‘value’: 1 + random.random()} does not increase disk, memory, and network utilization because Greenplum is able to aggregate emitted values by key if the Reduce function allows it (like SUM or COUNT or MAX). Greenplum provides ability to specify custom aggregators for custom Reduce functions.
Optimization of Composite Jobs
The MapReduce framework in Greenplum is notable for how easily SQL queries and multiple MapReduce tasks can be combined together in a single job. Greenplum’s notation is more powerful and less verbose than in Hadoop stack, raw Hadoop and Hive are definitely excelled by Greenplum in this respect and even in comparison with Pig notation Greenplum is still competitive.
In this section we are trying to understand how efficiently Greenplum can process jobs that consist of multiple SQL queries and MapReduce tasks. Let us consider an example that runs two SQL queries (values_01 and values_02 inputs) to retrieve input data for two MapReduce tasks (tuples_py_counter) and routes outputs of these tasks to the third MapReduce task (ident) that produces the final result. The TUPLES table is just a set of random (id, value) pairs:
%YAML 1.1
---
VERSION: 1.0.0.1
DATABASE: gptest
USER: gpadmin
DEFINE:
- INPUT:
NAME: values_01
QUERY: select value from TUPLES where id < 500
- INPUT:
NAME: values_02
QUERY: select value from TUPLES where id > 500
- MAP:
NAME: tuples_py_counter
LANGUAGE: PYTHON
PARAMETERS: [value]
RETURNS: [key text, value integer]
OPTIMIZE: STRICT IMMUTABLE
MODE: MULTI
FUNCTION: |
yield {'key': 'sum', 'value': value}
- TASK:
NAME: task_01
SOURCE: values_01
MAP: tuples_py_counter
REDUCE: SUM
- TASK:
NAME: task_02
SOURCE: values_02
MAP: tuples_py_counter
REDUCE: SUM
- INPUT:
NAME: final_input
QUERY: select key, value from task_01 union all select key, value from task_02
- MAP:
NAME: ident
LANGUAGE: PYTHON
PARAMETERS: [key, value]
RETURNS: [key text, value integer]
FUNCTION: |
yield {'key': key, 'value': value}
EXECUTE:
- RUN:
SOURCE: final_input
MAP: ident
REDUCE: IDENTITY
Analyzing the query plan and making execution time measurements we can infer that Greenplum’s MapReduce is intelligent enough to recognize tasks hierarchy and optimize MapReduce jobs similarly to SQL queries:
Let’s now add an additional independent SQL clause to the input expression for the third MapReduce task, i.e. replace
select key, value from task_01 union all select key, value from task_02
by
select key, value from task_01 union all select key, value from task_02 union all select 'last_sum', sum(value) from TUPLES where id < 50
In this case Greenplum is still able to trace dependencies between different parts of the job and run two MapReduce tasks and extra SQL in parallel:
MapReduce Jobs and Transactions
It’s worth noting that MapReduce jobs in Greenplum appears to be executed within the transaction boundaries. If a job is executed simultaneously with queries that update data in the tables the job works with, results of the jobs appears to always be consistent – at least in all experiment I’ve made.
MapReduce and Workload Management
Workload management – the ability to assign resource (CPU etc.) quotas for different users – is a very important aspect of data warehousing systems because such systems are often used by multiple business flows (production queries, analyst, etc.) and eligible for heavy queries. Without proper workload management there is a significant risk that one flow will be blocked by another. Greenplum Database offers quite flexible workload management which is based user roles.
Unfortunately, most languages MapReduce can be written in (say, python) are so-called untrusted that implies that MapReduce jobs can be execute only with admin permissions. This makes workload management for MapReduce problematic.
Conclusion
Greenplum appears to have quite powerful and consistent MapReduce implementation. Imperative MapReduce tasks can be conveniently blended with SQL queries and chained into multistage jobs. It seems that MapReduce implementation seamlessly is integrated into query execution pipeline and fully eligible for execution plan optimization, transaction management etc. From this point of view Greenplum looks to be much more elegant that Hadoop/Hive alternative. On the other hand, workload management and debugging capabilities seems to be week, although this can’t be considered as a crucial issue.


Posted on January 1, 2012
0