Can Shard-Query scale to 20 nodes?
Peter asked this question in comments to to my previous Shard-Query benchmark. Actually he asked if it could scale to 50, but testing 20 was all I could due to to EC2 and time limits. I think the results at 20 nodes are very useful to understand the performance:
Distributed set processing (theory)
What is SQL?
As you probably know, SQL stands for “structured query language”. It isn’t so much the language that is structured, but actually the data. Every SQL statements breaks down into a relational algebra equation. In Algebra you learn that some operations are “distributable”, that is, you can split them up into smaller units, and when you put those units back together the result is the same as if you didn’t take it apart. Projection, GROUP BY, SUM, COUNT, MIN*, and MAX* are distributable. With a little elbow grease, all non-distributable aggregate functions (AVG,STDDEV,VARIANCE,etc) can be decomposed into distributable functions using simple substitution rules.
So, to recap, every SQL query is really a cleverly disguised relational algebra mathematical expression. With relational algebraic substitution, every aggregate expression, even non-distributable ones, can be broken into distributable sub-expressions.
What is a result set?
This isn’t really a trick question. The “result set” is a SET created by the output of a relational algebra expresion. Relational algebra expressions always produce sets as output. Just to drive it home, SQL is relational algebra, and this algebra only operates on sets. The next important thing to understand about SQL is that it is declarative. That is, you tell the database what you want but not how to get it. Most distributed engines work with rows. They break the queries up into the lowest level sets possible (rows) which doesn’t makes much sense to me, since SQL is set oriented. I just said repeatedly that SQL doesn’t work on rows! Why would you break this paradigm by passing around rows instead of sets? From a distributed processing standpoint, rows are the worst case for performance. Optimal mathematical performance requires operations on reduced sets. Keep in mind, that rows based systems work well but still, these systems are much farther from optimal than working directly with relational algebra.
Materialized views techniques applied to distributed computation
I maintain another open source tool called Flexviews which supports incrementally maintaining materialized views. While writing Flexviews, I learned how to distribute the computation of aggregation queries over time. I realised that I could apply these same mathematical concepts to distributed queries as well, but with some minor differences.
Having written Flexviews, I understood that there are special materialized view optimizations that can be applied to INSERT-only workloads, and their are other optimizations that can be applied to views that are based on only a single base table. Knowing this, the query result set is treated as a materialized view over a union of all the already joined and aggregated data from all the nodes. A single temporary table is used to store the results from all the nodes. Since we are projecting results, this is naturally an INSERT-only workload. The insertions into the base table from each node correspond logically to a records in a Flexviews materialized view delta table and thus the logic for applying changes via ON DUPLICATE KEY UPDATE is the same. All of the fastest incremental materialized view optimizations can be applied.
Shard-Query works only on sets
Shard-Query takes relational algebra to its logical maximum conclusion, splitting a problem up into many small problems and then putting the results back together again. This set logic allows to multiple levels of reduction of the set, all in parallel. All joins, aggregation and filtering is done at the storage node level. This means that Shard-Query does not have to have any idea of the structure of the data on which your queries operate. It can, however, use a mapper for partition elimination. The mapper is pluggable.
On each storage node (or a mapped subset of nodes) the result set is aggregated using distributable aggregate functions. The results from this aggregation get a second distributed reduce using UPSERTs into the coordination node when maintaining the “materialized view” of the results. Finally, a single threaded final group-by reduction is run over the coordination node, and this projects the final result set.
One or more gearman queues are used to compute work. Computation is massively parallel. Shard-Query simply waits for all the queries to finish, then projects the synchronously maintained incrementally refreshable materialized view that represents the output. There is no external locking or synchronization required. If a query fails on a storage node, then it can be retried on the same node, or in the future, a redundant node.
Work on problems of any size.
Set processing is massively parallel
In fact, it is embarassingly parallel. Because Shard-Query works on sets, and features pluggable partition mapping, it allows partitioning resources to any depth and distributing a query over that set of resources. Lets say you have a database system that is at capacity in data center A, and there is not enough power to add new nodes there. You can add new nodes in data center B and distribute the queries over both data centers. The response time should only be increased by the average latency, since work is done in parallel and there is very little data shipping because of the distributed reduction discussed above. If you have any limitation in resources in a cluster (cpu, memory, disk, power,etc) then split the problem up into more chunks.
Each Shard-Query instance is a essentially a proxy for the distributed set based SQL operations executed on the nodes under the proxy. The databases do 99.9% of the work, due to the multiple levels of result set reduction. Finally, Shard-Query can automatically partition sets into subsets using basic relational algebra substitution rules that are documented in this blog post. This allows BETWEEN, IN, subqueries in the FROM clause, and UNION operations to operate fully in parallel.
Distributed set processing is database agnostic.
Keep in mind, at each level of partitioning only a list of servers and some credentials are required to make this work. It can even be set up without the aid of your database administrator.
As long as all of the nodes share a common schema model and share a common SQL dialect, then they can all participate in the distributed query processing. There is almost no data shipping, as only aggregated sets are sent over the network. In the future you will be able to distribute work over any type of compute resource which speaks SQL, but right now only MySQL storage nodes are supported. Amdahl’s law applies to the distributed processing. The results from the slowest node will place a lower bound on the minimum performance level.
Soon you will be able to set up computation over ICE, InnoDB, Vectorwise and other databases, all at the same time, and transparently to the source query. Adding a new storage node SQL dialect is almost trivial. I’ll document that process shortly, I think I need to make some minor abstraction modifications in the data access layer.
Shard-Query can provide query execution plans based on the relational algebra rewrites
So, here is an example of such a plan for a simple query with aggregation, a JOIN, and a WHERE clause that uses an IN clause:
-- INPUT SQL: select origin_airport_id, count(*), sum(AirTime), sum(DepDelay), sum(DepDelay >= 0) flight_delayed from ontime_fact join dim_date on ontime_fact.date_id = dim_date.date_id where dim_date.Year IN (2008,2009) group by 1; --PARALLEL OPTIMIZATIONS: * Base level table name: `aggregation_tmp#74082863` * IN list optimization enabled * Detected an IN list with 2 items * IN list compression is not required -- 2 items is less than the inlist-merge-threshold of 128 items * The following projections were selected for a UNIQUE CHECK * on the base table: `origin_airport_id` * storage node result set merge optimization enabled: ON DUPLICATE KEY UPDATE `origin_airport_id`=VALUES(`origin_airport_id`), `count(*)`=`count(*)` + VALUES(`count(*)`), `sum(AirTime)`=`sum(AirTime)` + VALUES(`sum(AirTime)`), `sum(DepDelay)`=`sum(DepDelay)` + VALUES(`sum(DepDelay)`), `sum(DepDelay >= 0) flight_delayed`=`sum(DepDelay >= 0) flight_delayed` + VALUES(`sum(DepDelay >= 0) flight_delayed`) -- SQL TO SEND TO SHARDS Array (  => SELECT origin_airport_id AS `origin_airport_id`,COUNT(*) AS `count(*)`, SUM((AirTime)) AS `sum(AirTime)`,SUM((DepDelay)) AS `sum(DepDelay)`, SUM((DepDelay >= 0)) AS `sum(DepDelay >= 0) flight_delayed` FROM ontime_fact AS `ontime_fact` JOIN dim_date AS `dim_date` ON (ontime_fact.date_id = dim_date.date_id) WHERE dim_date.Year IN (2008) GROUP BY 1 ORDER BY NULL  => SELECT origin_airport_id AS `origin_airport_id`,COUNT(*) AS `count(*)`, SUM((AirTime)) AS `sum(AirTime)`,SUM((DepDelay)) AS `sum(DepDelay)`, SUM((DepDelay >= 0)) AS `sum(DepDelay >= 0) flight_delayed` FROM ontime_fact AS `ontime_fact` JOIN dim_date AS `dim_date` ON (ontime_fact.date_id = dim_date.date_id) WHERE dim_date.Year IN (2009) GROUP BY 1 ORDER BY NULL ) -- AGGREGATION SQL: SELECT `origin_airport_id`, SUM(`count(*)`) AS `count(*)`, SUM(`sum(AirTime)`) AS `sum(AirTime)`, SUM(`sum(DepDelay)`) AS `sum(DepDelay)`, SUM(`sum(DepDelay >= 0) flight_delayed`) AS `sum(DepDelay >= 0) flight_delayed` FROM `aggregation_tmp_74082863` GROUP BY 1 --POST EXEC: DROP TABLE IF EXISTS `aggregation_tmp#74082863` ;
*MIN/MAX are only distributable in INSERT-only workloads
See this information about maintaining materialized views: