While Shard-Query can work over multiple nodes, this blog post focuses on using Shard-Query with a single node. Shard-Query can add parallelism to queries which use partitioned tables. Very large tables can often be partitioned fairly easily. Shard-Query can leverage partitioning to add paralellism, because each partition can be queried independently. Because MySQL 5.6 supports the partition hint, Shard-Query can add parallelism to any partitioning method (even subpartioning) on 5.6 but it is limited to RANGE/LIST partitioning methods on early versions.
The output from Shard-Query is from the commandline client, but you can use MySQL proxy to communicate with Shard-Query too.
In the examples I am going to use the schema from the Star Schema Benchmark. I generated data for scale factor 10, which means about 6GB of data in the largest table. I am going to show a few different queries, and explain how Shard-Query executes them in parallel.
Here is the DDL for the lineorder table, which I will use for the demo queries:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | CREATE TABLE IF NOT EXISTS lineorder ( LO_OrderKey bigint not null, LO_LineNumber tinyint not null, LO_CustKey int not null, LO_PartKey int not null, LO_SuppKey int not null, LO_OrderDateKey int not null, LO_OrderPriority varchar(15), LO_ShipPriority char(1), LO_Quantity tinyint, LO_ExtendedPrice decimal, LO_OrdTotalPrice decimal, LO_Discount decimal, LO_Revenue decimal, LO_SupplyCost decimal, LO_Tax tinyint, LO_CommitDateKey int not null, LO_ShipMode varchar(10), primary key(LO_OrderDateKey,LO_PartKey,LO_SuppKey,LO_Custkey,LO_OrderKey,LO_LineNumber) ) PARTITION BY HASH(LO_OrderDateKey) PARTITIONS 8; |
Notice that the lineorder table is partitioned by HASH(LO_OrderDateKey) into 8 partitions. I used 8 partitions and my test box has 4 cores. It does not hurt to have more partitions than cores. A number of partitions that is two or three times the number of cores generally works best because it keeps each partition small, and smaller partitions are faster to scan. If you have a very large table, a larger number of partitions may be acceptable. Shard-Query will submit a query to Gearman for each partition, and the number of Gearman workers controls the parallelism.
The SQL for the first demo is:
1 | SELECT COUNT(DISTINCT LO_OrderDateKey) FROM lineorder; |
Here is the explain from regular MySQL:
1 2 3 4 5 6 7 8 9 10 11 12 13 | mysql> explain select count(distinct LO_OrderDateKey) from lineorderG *************************** 1. row *************************** id: 1 select_type: SIMPLE table: lineorder type: index possible_keys: PRIMARY key: PRIMARY key_len: 25 ref: NULL rows: 58922188 Extra: Using index 1 row in set (0.00 sec) |
So it is basically a full table scan. It takes a long time:
1 2 3 4 5 6 7 | mysql> select count(distinct LO_OrderDateKey) from lineorder; +---------------------------------+ | count(distinct LO_OrderDateKey) | +---------------------------------+ | 2406 | +---------------------------------+ 1 row in set (4 min 48.63 sec) |
Shard-Query executes this query differently from MySQL. It sends a query to each partition, in parallel like the following queries:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | Array ( [0] => SELECT LO_OrderDateKey AS expr_2839651562 FROM lineorder PARTITION(p0) AS `lineorder` WHERE 1=1 AND 1=1 GROUP BY LO_OrderDateKey [1] => SELECT LO_OrderDateKey AS expr_2839651562 FROM lineorder PARTITION(p1) AS `lineorder` WHERE 1=1 AND 1=1 GROUP BY LO_OrderDateKey [2] => SELECT LO_OrderDateKey AS expr_2839651562 FROM lineorder PARTITION(p2) AS `lineorder` WHERE 1=1 AND 1=1 GROUP BY LO_OrderDateKey [3] => SELECT LO_OrderDateKey AS expr_2839651562 FROM lineorder PARTITION(p3) AS `lineorder` WHERE 1=1 AND 1=1 GROUP BY LO_OrderDateKey [4] => SELECT LO_OrderDateKey AS expr_2839651562 FROM lineorder PARTITION(p4) AS `lineorder` WHERE 1=1 AND 1=1 GROUP BY LO_OrderDateKey [5] => SELECT LO_OrderDateKey AS expr_2839651562 FROM lineorder PARTITION(p5) AS `lineorder` WHERE 1=1 AND 1=1 GROUP BY LO_OrderDateKey [6] => SELECT LO_OrderDateKey AS expr_2839651562 FROM lineorder PARTITION(p6) AS `lineorder` WHERE 1=1 AND 1=1 GROUP BY LO_OrderDateKey [7] => SELECT LO_OrderDateKey AS expr_2839651562 FROM lineorder PARTITION(p7) AS `lineorder` WHERE 1=1 AND 1=1 GROUP BY LO_OrderDateKey ) |
You will notice that there is one query for each partition. Those queries will be sent to Gearman and executed in parallel by as many Gearman workers as possible (in this case 4.) The output of the queries go into a coordinator table, and then another query does a final aggregation. That query looks like this:
1 2 | SELECT COUNT(distinct expr_2839651562) AS `count` FROM `aggregation_tmp_73522490` |
The Shard-Query time:
1 2 3 4 5 6 7 | select count(distinct LO_OrderDateKey) from lineorder; Array ( [count ] => 2406 ) 1 rows returned Exec time: 0.10923719406128 |
That isn’t a typo, it really is sub-second compared to minutes in regular MySQL.
This is because Shard-Query uses GROUP BY to answer this query and a loose index scan of the PRIMARY KEY is possible:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | mysql> explain partitions SELECT LO_OrderDateKey AS expr_2839651562 -> FROM lineorder PARTITION(p7) AS `lineorder` WHERE 1=1 AND 1=1 GROUP BY LO_OrderDateKey -> G *************************** 1. row *************************** id: 1 select_type: SIMPLE table: lineorder partitions: p7 type: range possible_keys: PRIMARY key: PRIMARY key_len: 4 ref: NULL rows: 80108 Extra: Using index for group-by 1 row in set (0.00 sec) |
Next another simple query will be tested, first on regular MySQL:
1 2 3 4 5 6 7 | mysql> select count(*) from lineorder; +----------+ | count(*) | +----------+ | 59986052 | +----------+ 1 row in set (4 min 8.70 sec) |
Again, the EXPLAIN shows a full table scan:
1 2 3 4 5 6 7 8 9 10 11 12 13 | mysql> explain select count(*) from lineorderG *************************** 1. row *************************** id: 1 select_type: SIMPLE table: lineorder type: index possible_keys: NULL key: PRIMARY key_len: 25 ref: NULL rows: 58922188 Extra: Using index 1 row in set (0.00 sec) |
Now, Shard-Query can’t do anything special to speed up this query, except to execute it in parallel, similar to the first query:
1 2 3 4 5 6 7 8 9 | [0] => SELECT COUNT(*) AS expr_3190753946 FROM lineorder PARTITION(p0) AS `lineorder` WHERE 1=1 AND 1=1 [1] => SELECT COUNT(*) AS expr_3190753946 FROM lineorder PARTITION(p1) AS `lineorder` WHERE 1=1 AND 1=1 [2] => SELECT COUNT(*) AS expr_3190753946 FROM lineorder PARTITION(p2) AS `lineorder` WHERE 1=1 AND 1=1 [3] => SELECT COUNT(*) AS expr_3190753946 FROM lineorder PARTITION(p3) AS `lineorder` WHERE 1=1 AND 1=1 ... |
The aggregation SQL is similar, but this time the aggregate function is changed to SUM to combine the COUNT from each partition:
1 2 | SELECT SUM(expr_3190753946) AS ` count ` FROM `aggregation_tmp_51969525` |
And the query is quite a bit faster at 140.24 second compared with MySQL’s 248.7 second result:
1 2 3 4 5 6 | Array ( [count ] => 59986052 ) 1 rows returned Exec time: 140.24419403076 |
Finally, I want to look at a more complex query that uses joins and aggregation.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | mysql> explain select d_year, c_nation, sum(lo_revenue - lo_supplycost) as profit from lineorder join dim_date on lo_orderdatekey = d_datekey join customer on lo_custkey = c_customerkey join supplier on lo_suppkey = s_suppkey join part on lo_partkey = p_partkey where c_region = 'AMERICA' and s_region = 'AMERICA' and (p_mfgr = 'MFGR#1' or p_mfgr = 'MFGR#2') group by d_year, c_nation order by d_year, c_nation; +----+-------------+-----------+--------+---------------+---------+---------+--------------------------+------+---------------------------------+ | id | select_type | table | type | possible_keys | key | key_len | ref | rows | Extra | +----+-------------+-----------+--------+---------------+---------+---------+--------------------------+------+---------------------------------+ | 1 | SIMPLE | dim_date | ALL | PRIMARY | NULL | NULL | NULL | 5 | Using temporary; Using filesort | | 1 | SIMPLE | lineorder | ref | PRIMARY | PRIMARY | 4 | ssb.dim_date.D_DateKey | 89 | NULL | | 1 | SIMPLE | supplier | eq_ref | PRIMARY | PRIMARY | 4 | ssb.lineorder.LO_SuppKey | 1 | Using where | | 1 | SIMPLE | customer | eq_ref | PRIMARY | PRIMARY | 4 | ssb.lineorder.LO_CustKey | 1 | Using where | | 1 | SIMPLE | part | eq_ref | PRIMARY | PRIMARY | 4 | ssb.lineorder.LO_PartKey | 1 | Using where | +----+-------------+-----------+--------+---------------+---------+---------+--------------------------+------+---------------------------------+ 5 rows in set (0.01 sec) |
Here is the query on regular MySQL:
1 2 3 4 5 6 7 8 9 | mysql> select d_year, c_nation, sum(lo_revenue - lo_supplycost) as profit from lineorder join dim_date on lo_orderdatekey = d_datekey join customer on lo_custkey = c_customerkey join supplier on lo_suppkey = s_suppkey join part on lo_partkey = p_partkey where c_region = 'AMERICA' and s_region = 'AMERICA' and (p_mfgr = 'MFGR#1' or p_mfgr = 'MFGR#2') group by d_year, c_nation order by d_year, c_nation; +--------+---------------+--------------+ | d_year | c_nation | profit | +--------+---------------+--------------+ | 1992 | ARGENTINA | 102741829748 | ... | 1998 | UNITED STATES | 61345891337 | +--------+---------------+--------------+ 35 rows in set (11 min 56.79 sec) |
Again, Shard-Query splits up the query to run over each partition (I won’t bore you with the details) and it executes the query faster than MySQL, in 343.3 second compared to ~720:
1 2 3 4 5 6 7 8 | Array ( [d_year] => 1998 [c_nation] => UNITED STATES [profit] => 61345891337 ) 35 rows returned Exec time: 343.29854893684 |
I hope you see how using Shard-Query can speed up queries without using sharding, on just a single server. All you really need to do is add partitioning.
You can get Shard-Query from GitHub at https://github.com/greenlion/swanhart-tools
Please note: Configure and install Shard-Query as normal, but simply use one node and set the column option (the shard column) to “nocolumn” or false, because you are not required to use a shard column if you are not sharding.
Are the results always consistent?
From the wiki: “Shard-Query behaves like READ-COMMITTED with respect to each query.” Does it mean the full query or queries on each partition?
The scan of each partition is consistent. You could see changing data if a partition is changed before Shard-Query starts querying it.
set up shard-qury and run as simple query (select count(*) from mtrack_log_error) use php run_query, got error, the query was rewrite to (the where clause is after the and)
Array
(
[0] => SELECT COUNT(*) AS expr_1564968823
FROM mtrack_log_error AS
mtrack_log_error
AND UNIX_TIMESTAMP(time) SELECT COUNT(*) AS expr_1564968823FROM mtrack_log_error AS
mtrack_log_error
AND UNIX_TIMESTAMP(time) >= (1396328400) AND UNIX_TIMESTAMP(time) SELECT COUNT(*) AS expr_1564968823FROM mtrack_log_error AS
mtrack_log_error
AND UNIX_TIMESTAMP(time) >= (1398920400) AND UNIX_TIMESTAMP(time) < (1401598800) WHERE 1=1 AND 1=1
)
the table was partitioned by time
I checked the fix for this problem into git. Grab the latest version at http://github.com/greenlion/swanhart-tools
Thanks for the fix, there is no sql syntax error. but still have a problem. when the query is rewrite to
SELECT COUNT(*) AS
count(*)
FROM mtrack_log_error AS
mtrack_log_error
WHERE 1=1 AND UNIX_TIMESTAMP(time) explain partitions select count(*) from mtrack_log_error where 1=1 AND UNIX_TIMESTAMP(time) explain partitions select count(*) from mtrack_log_error where 1=1 AND time < from_unixtime(1396328400) ORDER BY NULL;+—-+————-+——————+————+——-+—————+————+———+——+———-+—
———————–+
| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | Ex
tra |
+—-+————-+——————+————+——-+—————+————+———+——+———-+—
———————–+
| 1 | SIMPLE | mtrack_log_error | p1403 | index | NULL | error_time | 6 | NULL | 88128625 | Us
ing where; Using index |
+—-+————-+——————+————+——-+—————+————+———+——+———-+—
———————–+
so it will only scan one partition.
Thanks
No sure why my previous post is a mess, it missed lots of stuff, what I mean is the query;
select count(*) from mtrack_log_error where 1=1 AND UNIX_TIMESTAMP(time) < (1396328400) ORDER
BY NULL;
should be changed to
select count(*) from mtrack_log_error where 1=1 AND time < from_unixtime (1396328400) ORDER
BY NULL;
so it will scan only one partition instead of all partitions.
Thanks
Also I got below error in the final stage:
ERRORS RETURNED BY OPERATION:
Array
(
[0] => [message:Error while inserting: INSERT INTO
aggregation_tmp_5974337
VALUES (144897986) ON DUPLICATE KEY UPDATE
count(*)
=count(*)
+ VALUES(count(*)
):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0][1] => [message:Error while inserting: INSERT INTO
aggregation_tmp_5974337
VALUES (95701615) ON DUPLICATE KEY UPDATE
count(*)
=count(*)
+ VALUES(count(*)
):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0][2] => [message:Error while inserting: INSERT INTO
aggregation_tmp_5974337
VALUES (0) ON DUPLICATE KEY UPDATEcou
=nt(*)
count(*)
+ VALUES(count(*)
):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0])
no query results
Exec time: 192.84175181389
I think you are having problems because “time” is a function name in mysql, and you are also using it as a column name. MySQL does not pass functions down to the shards, but only columns, and it doesn’t think that time is a column, but a function.
I will need to think about how to fix this.
Do you mean the partition problem? if you change from UNIX_TIMESTAMP(time) < (1396328400) to time [message:Error while inserting: INSERT INTO aggregation_tmp_5974337 VALUES (144897986) ON DUPLICATE KEY UPDATE count(*)=count(*) + VALUES(count(*)):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0]
Thanks
If the query only accesses one partition due to the WHERE clause, then Shard-Query won’t scan all the partitions as there is no data of interest in the other partitions.
The table has three partitions (three months data), when run the query select count(*) from mtrack_log_error, it was rewrite to three queries, like the below:
select count(*) from mtrack_log_error where 1=1 AND UNIX_TIMESTAMP(time) = (1396328400) and UNIX_TIMESTAMP(time) = (1398920400) and UNIX_TIMESTAMP(time) <(1401598800) ORDER BY NULL;
for each query, even it will get one month data, but it still can three partitions because it use UNIX_TIMESTAMP(time) < (1396328400) just like you use a function on index column. should be changed to time<from_unixtime (1396328400)
Still got below error in the final stage, not sure how to fix it
ERRORS RETURNED BY OPERATION:
Array
(
[0] => [message:Error while inserting: INSERT INTO aggregation_tmp_5974337 VALUES (144897986) ON DUPLICATE KEY UPD
ATE count(*)=count(*) + VALUES(count(*)):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0]
[1] => [message:Error while inserting: INSERT INTO aggregation_tmp_5974337 VALUES (95701615) ON DUPLICATE KEY UPDA
TE count(*)=count(*) + VALUES(count(*)):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0]
[2] => [message:Error while inserting: INSERT INTO aggregation_tmp_5974337 VALUES (0) ON DUPLICATE KEY UPDATE cou
nt(*)=count(*) + VALUES(count(*)):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0]
)
no query results
Exec time: 192.84175181389
It looks like you are using an old version of Shard-Query. The latest version should print out –verbose like this:
SQL TO SEND TO SHARDS:
Array
(
[0] => SELECT COUNT(*) AS expr_2828757189
FROM mtrack_log_error AS
mtrack_log_error
WHERE 1=1 AND UNIX_TIMESTAMP(time) SELECT COUNT(*) AS expr_2828757189FROM mtrack_log_error AS
mtrack_log_error
WHERE 1=1 AND UNIX_TIMESTAMP(time) >= (1396328400) AND UNIX_TIMESTAMP(time) SELECT COUNT(*) AS expr_2828757189FROM mtrack_log_error AS
mtrack_log_error
WHERE 1=1 AND UNIX_TIMESTAMP(time) >= (1398920400) AND UNIX_TIMESTAMP(time) 2)
1 rows returned
Exec time: 0.019656181335449
You can get the latest version from http://github.com/greenlion/swanhart-tools
I download shard-query from http://github.com/greenlion/swanhart-tools on June 2, not sure if it is the old version or not. I have download it again, do I just need to copy the shard-query to /usr/shar/ or need to redo all the set up after the copy.
Thanks
nice work, would you give an example on how
“Shard-Query can work over multiple nodes”
How fast does
SELECT COUNT(DISTINCT LO_OrderDateKey) FROM lineorder;
with partitioning removed? (I suspect this is an extreme example of partitioning hurting.)In your tests, was all the data cached in RAM? Small enough to fit in RAM? Or necessarily I/O-bound? I suspect Shard-Query shines in the first case.
It looks like shard-query -or gearman library- has some issues with foreign accent characters, if a column has such characters, the result value comes as NULL. Have you experienced that? How can I fix it?
I would like to know how can I get Lineorder table(Data set) to follow this example.