Обсуждение: Parallel Foreign Scans - need advice
Hi all, I’m working on an FDW that would benefit greatly from parallel foreign scan. I have implemented the callbacks describedhere:https://www.postgresql.org/docs/devel/fdw-callbacks.html#FDW-CALLBACKS-PARALLEL. and I see a big improvementin certain plans. My problem is that I can’t seem to get a parallel foreign scan in a query that does not contain an aggregate. For example: SELECT count(*) FROM foreign table; Gives me a parallel scan, but SELECT * FROM foreign table; Does not. I’ve been fiddling with the costing GUCs, foreign scan row estimates, and foreign scan cost estimates - I can force the costof a partial path to be much lower than a sequential foreign scan, but no luck. Any troubleshooting advice? A second related question - how can I find the actual number of workers chose for my ForeignScan? At the moment, I lookingat ParallelContext->nworkers (inside of the InitializeDSMForeignScan() callback) because that seems to be the firstcallback function that might provide the worker count. I need the *actual* worker count in order to evenly distributemy workload. I can’t use the usual trick of having each worker grab the next available chunk (because I have toavoid seek operations on compressed data). In other words, it is of great advantage for each worker to read contiguouschunks of data - seeking to another part of the file is prohibitively expensive. Thanks for all help. — Korry
Hi, On 2019-05-15 12:55:33 -0400, Korry Douglas wrote: > Hi all, I’m working on an FDW that would benefit greatly from parallel foreign scan. I have implemented the callbacksdescribed here:https://www.postgresql.org/docs/devel/fdw-callbacks.html#FDW-CALLBACKS-PARALLEL. and I see a bigimprovement in certain plans. > > My problem is that I can’t seem to get a parallel foreign scan in a query that does not contain an aggregate. > > For example: > SELECT count(*) FROM foreign table; > Gives me a parallel scan, but > SELECT * FROM foreign table; > Does not. Well, that'd be bound by the cost of transferring tuples between workers and leader. You don't get, unless you fiddle heavily with the cost, a parallel scan for the equivalent local table scan either. You can probably force the planner's hand by setting parallel_setup_cost, parallel_tuple_cost very low - but it's unlikely to be beneficial. If you added a where clause that needs to be evaluated outside the FDW, you'd probably see parallel scans without fiddling with the costs. > A second related question - how can I find the actual number of > workers chose for my ForeignScan? At the moment, I looking at > ParallelContext->nworkers (inside of the InitializeDSMForeignScan() > callback) because that seems to be the first callback function that > might provide the worker count. I need the *actual* worker count in > order to evenly distribute my workload. I can’t use the usual trick > of having each worker grab the next available chunk (because I have to > avoid seek operations on compressed data). In other words, it is of > great advantage for each worker to read contiguous chunks of data - > seeking to another part of the file is prohibitively expensive. Don't think - but am not sure - that there's a nicer way currently. Although I'd use nworkers_launched, rather than nworkers. Greetings, Andres Freund
Thanks for the quick answer Andres. You’re right - it was parallel_tuple_cost that was getting in my way; my query returnsabout 6 million rows so I guess that can add up. If I change parallel_tuple_scan from 0.1 to 0.0001, I get a parallel foreign scan. With 4 workers, that reduces my execution time by about half. But, nworkers_launched is always set to 0 in InitializeDSMForeignScan(), so that won’t work. Any other ideas? — Korry > On May 15, 2019, at 1:08 PM, Andres Freund <andres@anarazel.de> wrote: > > Hi, > > On 2019-05-15 12:55:33 -0400, Korry Douglas wrote: >> Hi all, I’m working on an FDW that would benefit greatly from parallel foreign scan. I have implemented the callbacksdescribed here:https://www.postgresql.org/docs/devel/fdw-callbacks.html#FDW-CALLBACKS-PARALLEL. and I see a bigimprovement in certain plans. >> >> My problem is that I can’t seem to get a parallel foreign scan in a query that does not contain an aggregate. >> >> For example: >> SELECT count(*) FROM foreign table; >> Gives me a parallel scan, but >> SELECT * FROM foreign table; >> Does not. > > Well, that'd be bound by the cost of transferring tuples between workers > and leader. You don't get, unless you fiddle heavily with the cost, a > parallel scan for the equivalent local table scan either. You can > probably force the planner's hand by setting parallel_setup_cost, > parallel_tuple_cost very low - but it's unlikely to be beneficial. > > If you added a where clause that needs to be evaluated outside the FDW, > you'd probably see parallel scans without fiddling with the costs. > > >> A second related question - how can I find the actual number of >> workers chose for my ForeignScan? At the moment, I looking at >> ParallelContext->nworkers (inside of the InitializeDSMForeignScan() >> callback) because that seems to be the first callback function that >> might provide the worker count. I need the *actual* worker count in >> order to evenly distribute my workload. I can’t use the usual trick >> of having each worker grab the next available chunk (because I have to >> avoid seek operations on compressed data). In other words, it is of >> great advantage for each worker to read contiguous chunks of data - >> seeking to another part of the file is prohibitively expensive. > > Don't think - but am not sure - that there's a nicer way > currently. Although I'd use nworkers_launched, rather than nworkers. > > Greetings, > > Andres Freund
Hi, Don't top quote on these list... On 2019-05-15 13:31:59 -0400, Korry Douglas wrote: > Thanks for the quick answer Andres. You’re right - it was parallel_tuple_cost that was getting in my way; my query returnsabout 6 million rows so I guess that can add up. > > If I change parallel_tuple_scan from 0.1 to 0.0001, I get a parallel foreign scan. > > With 4 workers, that reduces my execution time by about half. Then you probably need to adjust the scan costs you have. > But, nworkers_launched is always set to 0 in > InitializeDSMForeignScan(), so that won’t work. Any other ideas? At that state it's simply not yet known how many workers will be actually launched (they might not start successfully or such). Why do you need to know it there and not later? - Andres
>> But, nworkers_launched is always set to 0 in >> InitializeDSMForeignScan(), so that won’t work. Any other ideas? > > At that state it's simply not yet known how many workers will be > actually launched (they might not start successfully or such). Why do > you need to know it there and not later? > > - Andres I need to know at some point *before* I actually start scanning. The ParallelContext pointer is only available in EstimateDSMForeignScan(),InitializeDSMForeignScan(), and ReInitializeDSMForeignScan(). If there is some other way to discover the actual worker count, I’m open to that. The three functions above are not particularlyhelpful to me so I’m happy to look somewhere else. — Korry
On Thu, May 16, 2019 at 5:46 AM Korry Douglas <korry@me.com> wrote: > >> But, nworkers_launched is always set to 0 in > >> InitializeDSMForeignScan(), so that won’t work. Any other ideas? > > > > At that state it's simply not yet known how many workers will be > > actually launched (they might not start successfully or such). Why do > > you need to know it there and not later? > > > > - Andres > > I need to know at some point *before* I actually start scanning. The ParallelContext pointer is only available in EstimateDSMForeignScan(),InitializeDSMForeignScan(), and ReInitializeDSMForeignScan(). Hi Korry, That's only a superficial problem. You don't even know if or when the workers that are launched will all finish up running your particular node, because (for example) they might be sent to different children of a Parallel Append node above you (AFAICS there is no way for a participant to indicate "I've finished all the work allocated to me, but I happen to know that some other worker #3 is needed here" -- as soon as any participant reports that it has executed the plan to completion, pa_finished[] will prevent new workers from picking that node to execute). Suppose we made a rule that *every* worker must visit *every* partial child of a Parallel Append and run it to completion (and any similar node in the future must do the same)... then I think there is still a higher level design problem: if you do allocate work up front rather than on demand, then work could be unevenly distributed, and parallel query would be weakened. So I think you ideally need a simple get-next-chunk work allocator (like Parallel Seq Scan and like the file_fdw patch I posted[1]), or a pass-the-baton work allocator when there is a dependency between chunks (like Parallel Index Scan for btrees), or a more complicated multi-phase system that counts participants arriving and joining in (like Parallel Hash) so that participants can coordinate and wait for each other in controlled circumstances. If this compressed data doesn't have natural chunks designed for this purpose (like, say, ORC stripes), perhaps you could have a dedicated workers streaming data (compressed? decompressed?) into shared memory, and parallel query participants coordinating to consume chunks of that? [1] https://www.postgresql.org/message-id/CA%2BhUKG%2BqK3E2RF75PKfsV0sn2s018%2Bft--hUuCmd2R_yQ9tmPQ%40mail.gmail.com -- Thomas Munro https://enterprisedb.com
> That's only a superficial problem. You don't even know if or when the > workers that are launched will all finish up running your particular > node, because (for example) they might be sent to different children > of a Parallel Append node above you (AFAICS there is no way for a > participant to indicate "I've finished all the work allocated to me, > but I happen to know that some other worker #3 is needed here" -- as > soon as any participant reports that it has executed the plan to > completion, pa_finished[] will prevent new workers from picking that > node to execute). Suppose we made a rule that *every* worker must > visit *every* partial child of a Parallel Append and run it to > completion (and any similar node in the future must do the same)... > then I think there is still a higher level design problem: if you do > allocate work up front rather than on demand, then work could be > unevenly distributed, and parallel query would be weakened. What I really need (for the scheme I’m using at the moment) is to know how many workers will be used to execute my particularPlan. I understand that some workers will naturally end up idle while the last (busy) worker finishes up. I’mdividing the workload (the number of row groups to scan) by the number of workers to get an even distribution. I’m willing to pay that price (at least, I haven’t seen a problem so far… famous last words) I do plan to switch over to get-next-chunk allocator as you mentioned below, but I’d like to get the minimized-seek mechanismworking first. It sounds like there is no reliable way to get the information that I’m looking for, is that right? > So I think you ideally need a simple get-next-chunk work allocator > (like Parallel Seq Scan and like the file_fdw patch I posted[1]), or a > pass-the-baton work allocator when there is a dependency between > chunks (like Parallel Index Scan for btrees), or a more complicated > multi-phase system that counts participants arriving and joining in > (like Parallel Hash) so that participants can coordinate and wait for > each other in controlled circumstances. I haven’t looked at Parallel Hash - will try to understand that next. > If this compressed data doesn't have natural chunks designed for this > purpose (like, say, ORC stripes), perhaps you could have a dedicated > workers streaming data (compressed? decompressed?) into shared memory, > and parallel query participants coordinating to consume chunks of > that? I’ll give that some thought. Thanks for the ideas. — Korry
On Fri, May 17, 2019 at 12:45 AM Korry Douglas <korry@me.com> wrote: > It sounds like there is no reliable way to get the information that I’m looking for, is that right? Correct. And if there were, it could only be used to write bugs. Let me see if I can demonstrate... I'll use the file_fdw patch from the link I gave before, and I'll add an elog(LOG) message to show when fileIterateForeignScan() runs. $ echo 1 > /tmp/t2 postgres=# create table t1 as select generate_series(1, 1000000)::int i; SELECT 1000000 postgres=# create server files foreign data wrapper file_fdw; CREATE SERVER postgres=# create foreign table t2 (n int) server files options (filename '/tmp/t2', format 'csv'); CREATE FOREIGN TABLE The relevant EXPLAIN output is harder to understand if the parallel leader participates, but it changes nothing important, so I'll turn that off first, and then see how it is run: postgres=# set parallel_leader_participation = off; SET postgres=# explain (analyze, verbose) select count(*) from (select * from t1 union all select * from t2) ss; QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=14176.32..14176.33 rows=1 width=8) (actual time=234.023..234.023 rows=1 loops=1) Output: count(*) -> Gather (cost=14176.10..14176.31 rows=2 width=8) (actual time=233.840..235.079 rows=2 loops=1) Output: (PARTIAL count(*)) Workers Planned: 2 Workers Launched: 2 -> Partial Aggregate (cost=13176.10..13176.11 rows=1 width=8) (actual time=223.550..223.555 rows=1 loops=2) Output: PARTIAL count(*) Worker 0: actual time=223.432..223.443 rows=1 loops=1 Worker 1: actual time=223.667..223.668 rows=1 loops=1 -> Parallel Append (cost=0.00..11926.10 rows=500000 width=0) (actual time=0.087..166.669 rows=500000 loops=2) Worker 0: actual time=0.083..166.366 rows=499687 loops=1 Worker 1: actual time=0.092..166.972 rows=500314 loops=1 -> Parallel Seq Scan on public.t1 (cost=0.00..9425.00 rows=500000 width=0) (actual time=0.106..103.384 rows=500000 loops=2) Worker 0: actual time=0.123..103.106 rows=499686 loops=1 Worker 1: actual time=0.089..103.662 rows=500314 loops=1 -> Parallel Foreign Scan on public.t2 (cost=0.00..1.10 rows=1 width=0) (actual time=0.079..0.096 rows=1 loops=1) Foreign File: /tmp/numbers Foreign File Size: 2 b Worker 0: actual time=0.079..0.096 rows=1 loops=1 Planning Time: 0.219 ms Execution Time: 235.262 ms (22 rows) You can see the that Parallel Foreign Scan was only actually run by one worker. So if you were somehow expecting both of them to show up in order to produce the correct results, you have a bug. The reason that happened is because Parallal Append sent one worker to chew on t1, and another to chew on t2, but the scan of t2 was finished very quickly, so that worker then went to help out with t1. And for further proof of that, here's what I see in my server log (note only ever called twice, and in the same process): 2019-05-17 10:51:42.248 NZST [52158] LOG: fileIterateForeignScan 2019-05-17 10:51:42.248 NZST [52158] STATEMENT: explain analyze select count(*) from (select * from t1 union all select * from t2) ss; 2019-05-17 10:51:42.249 NZST [52158] LOG: fileIterateForeignScan 2019-05-17 10:51:42.249 NZST [52158] STATEMENT: explain analyze select count(*) from (select * from t1 union all select * from t2) ss; Therefore you can't allocate the work up front based on expected number of workers, even if it works in simple examples. Your node isn't necessarily the only node in the plan, and higher up nodes get to decide when, if at all, you run, in each worker. -- Thomas Munro https://enterprisedb.com