While Shard-Query can work over multiple nodes, this blog post focuses on using Shard-Query with a single node.  Shard-Query can add parallelism to queries which use partitioned tables.  Very large tables can often be partitioned fairly easily. Shard-Query can leverage partitioning to add paralellism, because each partition can be queried independently. Because MySQL 5.6 supports the partition hint, Shard-Query can add parallelism to any partitioning method (even subpartioning) on 5.6 but it is limited to RANGE/LIST partitioning methods on early versions.

The output from Shard-Query is from the commandline client, but you can use MySQL proxy to communicate with Shard-Query too.

In the examples I am going to use the schema from the Star Schema Benchmark.  I generated data for scale factor 10, which means about 6GB of data in the largest table. I am going to show a few different queries, and explain how Shard-Query executes them in parallel.

Here is the DDL for the lineorder table, which I will use for the demo queries:

Notice that the lineorder table is partitioned by HASH(LO_OrderDateKey) into 8 partitions.  I used 8 partitions and my test box has 4 cores. It does not hurt to have more partitions than cores. A number of partitions that is two or three times the number of cores generally works best because it keeps each partition small, and smaller partitions are faster to scan. If you have a very large table, a larger number of partitions may be acceptable. Shard-Query will submit a query to Gearman for each partition, and the number of Gearman workers controls the parallelism.

The SQL for the first demo is:

Here is the explain from regular MySQL:

 

So it is basically a full table scan. It takes a long time:

 

Shard-Query executes this query differently from MySQL. It sends a query to each partition, in parallel like the following queries:

You will notice that there is one query for each partition.  Those queries will be sent to Gearman and executed in parallel by as many Gearman workers as possible (in this case 4.)  The output of the queries go into a coordinator table, and then another query does a final aggregation.  That query looks like this:

The Shard-Query time:

That isn’t a typo, it really is sub-second compared to minutes in regular MySQL.

This is because Shard-Query uses GROUP BY to answer this query and a  loose index scan of the PRIMARY KEY is possible:

Next another simple query will be tested, first on regular MySQL:

Again, the EXPLAIN shows a full table scan:

Now, Shard-Query can’t do anything special to speed up this query, except to execute it in parallel, similar to the first query:

The aggregation SQL is similar, but this time the aggregate function is changed to SUM to combine the COUNT from each partition:

And the query is quite a bit faster at 140.24 second compared with MySQL’s 248.7 second result:

Finally, I want to look at a more complex query that uses joins and aggregation.

Here is the query on regular MySQL:

Again, Shard-Query splits up the query to run over each partition (I won’t bore you with the details) and it executes the query faster than MySQL, in 343.3 second compared to ~720:

I hope you see how using Shard-Query can speed up queries without using sharding, on just a single server. All you really need to do is add partitioning.

You can get Shard-Query from GitHub at https://github.com/greenlion/swanhart-tools

Please note: Configure and install Shard-Query as normal, but simply use one node and set the column option (the shard column) to “nocolumn” or false, because you are not required to use a shard column if you are not sharding.

19 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Patryk Pomykalski

Are the results always consistent?
From the wiki: “Shard-Query behaves like READ-COMMITTED with respect to each query.” Does it mean the full query or queries on each partition?

john

set up shard-qury and run as simple query (select count(*) from mtrack_log_error) use php run_query, got error, the query was rewrite to (the where clause is after the and)
Array
(
[0] => SELECT COUNT(*) AS expr_1564968823
FROM mtrack_log_error AS mtrack_log_error AND UNIX_TIMESTAMP(time) SELECT COUNT(*) AS expr_1564968823
FROM mtrack_log_error AS mtrack_log_error AND UNIX_TIMESTAMP(time) >= (1396328400) AND UNIX_TIMESTAMP(time) SELECT COUNT(*) AS expr_1564968823
FROM mtrack_log_error AS mtrack_log_error AND UNIX_TIMESTAMP(time) >= (1398920400) AND UNIX_TIMESTAMP(time) < (1401
598800) WHERE 1=1 AND 1=1
)

the table was partitioned by time

Justin Swanhart

I checked the fix for this problem into git. Grab the latest version at http://github.com/greenlion/swanhart-tools

john

Thanks for the fix, there is no sql syntax error. but still have a problem. when the query is rewrite to
SELECT COUNT(*) AS count(*)
FROM mtrack_log_error AS mtrack_log_error WHERE 1=1 AND UNIX_TIMESTAMP(time) explain partitions select count(*) from mtrack_log_error where 1=1 AND UNIX_TIMESTAMP(time) explain partitions select count(*) from mtrack_log_error where 1=1 AND time < from_unixtime(1396328400) ORDER BY NULL;
+—-+————-+——————+————+——-+—————+————+———+——+———-+—
———————–+
| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | Ex
tra |
+—-+————-+——————+————+——-+—————+————+———+——+———-+—
———————–+
| 1 | SIMPLE | mtrack_log_error | p1403 | index | NULL | error_time | 6 | NULL | 88128625 | Us
ing where; Using index |
+—-+————-+——————+————+——-+—————+————+———+——+———-+—
———————–+

so it will only scan one partition.

Thanks

john

No sure why my previous post is a mess, it missed lots of stuff, what I mean is the query;

select count(*) from mtrack_log_error where 1=1 AND UNIX_TIMESTAMP(time) < (1396328400) ORDER
BY NULL;

should be changed to

select count(*) from mtrack_log_error where 1=1 AND time < from_unixtime (1396328400) ORDER
BY NULL;

so it will scan only one partition instead of all partitions.

Thanks

john

Also I got below error in the final stage:

ERRORS RETURNED BY OPERATION:
Array
(
[0] => [message:Error while inserting: INSERT INTO aggregation_tmp_5974337 VALUES (144897986) ON DUPLICATE KEY UPD
ATE count(*)=count(*) + VALUES(count(*)):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0]
[1] => [message:Error while inserting: INSERT INTO aggregation_tmp_5974337 VALUES (95701615) ON DUPLICATE KEY UPDA
TE count(*)=count(*) + VALUES(count(*)):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0]
[2] => [message:Error while inserting: INSERT INTO aggregation_tmp_5974337 VALUES (0) ON DUPLICATE KEY UPDATE cou
nt(*)
=count(*) + VALUES(count(*)):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0]
)
no query results
Exec time: 192.84175181389

Justin Swanhart

I think you are having problems because “time” is a function name in mysql, and you are also using it as a column name. MySQL does not pass functions down to the shards, but only columns, and it doesn’t think that time is a column, but a function.

I will need to think about how to fix this.

john

Do you mean the partition problem? if you change from UNIX_TIMESTAMP(time) < (1396328400) to time [message:Error while inserting: INSERT INTO aggregation_tmp_5974337 VALUES (144897986) ON DUPLICATE KEY UPDATE count(*)=count(*) + VALUES(count(*)):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0]

Thanks

Justin Swanhart

If the query only accesses one partition due to the WHERE clause, then Shard-Query won’t scan all the partitions as there is no data of interest in the other partitions.

john

The table has three partitions (three months data), when run the query select count(*) from mtrack_log_error, it was rewrite to three queries, like the below:

select count(*) from mtrack_log_error where 1=1 AND UNIX_TIMESTAMP(time) = (1396328400) and UNIX_TIMESTAMP(time) = (1398920400) and UNIX_TIMESTAMP(time) <(1401598800) ORDER BY NULL;

for each query, even it will get one month data, but it still can three partitions because it use UNIX_TIMESTAMP(time) < (1396328400) just like you use a function on index column. should be changed to time<from_unixtime (1396328400)

john

Still got below error in the final stage, not sure how to fix it

ERRORS RETURNED BY OPERATION:
Array
(
[0] => [message:Error while inserting: INSERT INTO aggregation_tmp_5974337 VALUES (144897986) ON DUPLICATE KEY UPD
ATE count(*)=count(*) + VALUES(count(*)):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0]
[1] => [message:Error while inserting: INSERT INTO aggregation_tmp_5974337 VALUES (95701615) ON DUPLICATE KEY UPDA
TE count(*)=count(*) + VALUES(count(*)):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0]
[2] => [message:Error while inserting: INSERT INTO aggregation_tmp_5974337 VALUES (0) ON DUPLICATE KEY UPDATE cou
nt(*)=count(*) + VALUES(count(*)):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0]
)
no query results
Exec time: 192.84175181389

Justin Swanhart

It looks like you are using an old version of Shard-Query. The latest version should print out –verbose like this:
SQL TO SEND TO SHARDS:
Array
(
[0] => SELECT COUNT(*) AS expr_2828757189
FROM mtrack_log_error AS mtrack_log_error WHERE 1=1 AND UNIX_TIMESTAMP(time) SELECT COUNT(*) AS expr_2828757189
FROM mtrack_log_error AS mtrack_log_error WHERE 1=1 AND UNIX_TIMESTAMP(time) >= (1396328400) AND UNIX_TIMESTAMP(time) SELECT COUNT(*) AS expr_2828757189
FROM mtrack_log_error AS mtrack_log_error WHERE 1=1 AND UNIX_TIMESTAMP(time) >= (1398920400) AND UNIX_TIMESTAMP(time) 2
)
1 rows returned
Exec time: 0.019656181335449

You can get the latest version from http://github.com/greenlion/swanhart-tools

john

I download shard-query from http://github.com/greenlion/swanhart-tools on June 2, not sure if it is the old version or not. I have download it again, do I just need to copy the shard-query to /usr/shar/ or need to redo all the set up after the copy.

Thanks

Joe

nice work, would you give an example on how
“Shard-Query can work over multiple nodes”

Rick James

How fast does SELECT COUNT(DISTINCT LO_OrderDateKey) FROM lineorder; with partitioning removed? (I suspect this is an extreme example of partitioning hurting.)

Rick James

In your tests, was all the data cached in RAM? Small enough to fit in RAM? Or necessarily I/O-bound? I suspect Shard-Query shines in the first case.

Lulo

It looks like shard-query -or gearman library- has some issues with foreign accent characters, if a column has such characters, the result value comes as NULL. Have you experienced that? How can I fix it?

NaingAung

I would like to know how can I get Lineorder table(Data set) to follow this example.