Обсуждение: Re: Adjusting hash join memory limit to handle batch explosion

Поиск
Список
Период
Сортировка

Re: Adjusting hash join memory limit to handle batch explosion

От
Melanie Plageman
Дата:
On Tue, Dec 31, 2024 at 6:07 PM Tomas Vondra <tomas@vondra.me> wrote:
>
> This means that ultimately it's either (1) or (3), and the more I've
> been looking into this the more I prefer (1), for a couple reasons:
>
> * It's much simpler (it doesn't really change anything on the basic
> behavior, doesn't introduce any new files or anything like that.
>
> * There doesn't seem to be major difference in total memory consumption
> between the two approaches. Both allow allocating more memory.
>
> * It actually helps with the "indivisible batch" case - it relaxes the
> limit, so there's a chance the batch eventually fits and we stop adding
> more and more batches. With spill files that's not the case - we still
> keep the original limit, and we end up with the batch explosion (but
> then we handle it much more efficiently).

Okay, I've read all the patches proposed in this mail and most of the
downthread ideas, and I want to cast my vote for option 1.
I find the design of 3 too complicated for what it buys us.

The slices make it harder to understand how the system works. The
current 1-1 relationship in master between batches and spill files is
easier to reason about. With the slices, I'm imagining trying to
understand why we, for example, have to move tuples from batch 4 just
because batch 5 got too big for the hashtable.

I think something like this might be worth it if it solved the problem
entirely, but if it is just a somewhat better coping mechanism, I
don't think it is worth it.

I was excited about your raw file experiment. As Robert and you point
out -- we may need a file per batch, but for most of the hash join's
execution we don't need to keep buffers for each batch around.
However, given that the experiment didn't yield great results and we
haven't come up with an alternative solution with sufficiently few
flaws, I'm still in favor of 1.

The part of 1 I struggled to understand is the math in
ExecHashExceededMemoryLimit(). I think the other email you sent with
the charts and diagonals is about choosing the optimal hashtable size
and number of batches (when to stop growing the number of batches and
just increase the size of the hashtable). So, I'll dive into that.

> One thing I'm not sure about yet is whether this needs to tweak the
> hashjoin costing to also consider the files when deciding how many
> batches to use. Maybe it should?

I think it definitely should. The ExecChooseHashTableSize()
calculations look similar to what we use to calculate spaceAllowed, so
it makes sense that we would consider buffile sizes if we are counting
those in spaceUsed now.

- Melanie



Re: Adjusting hash join memory limit to handle batch explosion

От
Tomas Vondra
Дата:

On 1/9/25 21:42, Melanie Plageman wrote:
> On Tue, Dec 31, 2024 at 6:07 PM Tomas Vondra <tomas@vondra.me> wrote:
>>
>> This means that ultimately it's either (1) or (3), and the more I've
>> been looking into this the more I prefer (1), for a couple reasons:
>>
>> * It's much simpler (it doesn't really change anything on the basic
>> behavior, doesn't introduce any new files or anything like that.
>>
>> * There doesn't seem to be major difference in total memory consumption
>> between the two approaches. Both allow allocating more memory.
>>
>> * It actually helps with the "indivisible batch" case - it relaxes the
>> limit, so there's a chance the batch eventually fits and we stop adding
>> more and more batches. With spill files that's not the case - we still
>> keep the original limit, and we end up with the batch explosion (but
>> then we handle it much more efficiently).
> 
> Okay, I've read all the patches proposed in this mail and most of the
> downthread ideas, and I want to cast my vote for option 1.
> I find the design of 3 too complicated for what it buys us.
> 
> The slices make it harder to understand how the system works. The
> current 1-1 relationship in master between batches and spill files is
> easier to reason about. With the slices, I'm imagining trying to
> understand why we, for example, have to move tuples from batch 4 just
> because batch 5 got too big for the hashtable.
> 
> I think something like this might be worth it if it solved the problem
> entirely, but if it is just a somewhat better coping mechanism, I
> don't think it is worth it.
> 
> I was excited about your raw file experiment. As Robert and you point
> out -- we may need a file per batch, but for most of the hash join's
> execution we don't need to keep buffers for each batch around.
> However, given that the experiment didn't yield great results and we
> haven't come up with an alternative solution with sufficiently few
> flaws, I'm still in favor of 1.
> 

But I think those were two distinct proposals.

My experiment with raw files keeps adding batches just like the current
code (so it might quickly explode to 1M batches) and then keep feeding
data to 1M files at the same time. This doesn't work, the buffering
clearly helps a lot, and it'd affect all hashjoins, even those with
fewer batches.

Robert's idea kept using buffered files, but limited how many we can
fill at any phase. Say we'd use a limit of 1024 batches, but we actually
need 1M batches. Then we'd do the build in two phases - we'd generate
1024 batches, and then we'd split each of those batches into 1024
smaller batches. The trick (as I understand it) is those batches can't
overlap, so we'd not need more than 1024 batches, which greatly limits
the memory consumption. We could even use a lower limit, derived from
work_mem or something like that.

Of course, this is a more complex change than the "balancing" patch. But
maybe not that much, not sure. For me the main disadvantage is it
doesn't really help with the batch explosion for skewed data sets (or
data with many hash collisions). It can easily happen we blindly
increase nbatch until we use all the bits, and then break the work_mem
limit anyway.

But maybe there's a way to address that - the growthEnabled=false safety
is an unreliable solution, because it requires the whole batch to fall
to either of the new batches. A single tuple breaks that.

What if we instead compared the two new batches, and instead looked at
how far the split is from 1/2? And if it's very far from 1/2, we'd
either increase work_mem (a bit like the balancing), or disable nbatch
increases (maybe just temporarily).

> The part of 1 I struggled to understand is the math in
> ExecHashExceededMemoryLimit(). I think the other email you sent with
> the charts and diagonals is about choosing the optimal hashtable size
> and number of batches (when to stop growing the number of batches and
> just increase the size of the hashtable). So, I'll dive into that.
> 

That math is a bit unclear even to me, that patch was written before I
took the time to work out the formulas and visualizations. It works and
does about the right decisions, but with less rigor. So maybe don't
waste too much time trying to understand it.

>> One thing I'm not sure about yet is whether this needs to tweak the
>> hashjoin costing to also consider the files when deciding how many
>> batches to use. Maybe it should?
> 
> I think it definitely should. The ExecChooseHashTableSize()
> calculations look similar to what we use to calculate spaceAllowed, so
> it makes sense that we would consider buffile sizes if we are counting
> those in spaceUsed now.
> 

Yeah. I think the flaw is we may not actually know the number of batches
during planning. In the batch explosion example we start with very few
batches, that only happens during execution.


regards

-- 
Tomas Vondra




Re: Adjusting hash join memory limit to handle batch explosion

От
Melanie Plageman
Дата:
On Thu, Jan 9, 2025 at 6:59 PM Tomas Vondra <tomas@vondra.me> wrote:
>
>
>
> On 1/9/25 21:42, Melanie Plageman wrote:
> >
> > I was excited about your raw file experiment. As Robert and you point
> > out -- we may need a file per batch, but for most of the hash join's
> > execution we don't need to keep buffers for each batch around.
> > However, given that the experiment didn't yield great results and we
> > haven't come up with an alternative solution with sufficiently few
> > flaws, I'm still in favor of 1.
> >
>
> But I think those were two distinct proposals.
>
> My experiment with raw files keeps adding batches just like the current
> code (so it might quickly explode to 1M batches) and then keep feeding
> data to 1M files at the same time. This doesn't work, the buffering
> clearly helps a lot, and it'd affect all hashjoins, even those with
> fewer batches.

I see.

> Robert's idea kept using buffered files, but limited how many we can
> fill at any phase. Say we'd use a limit of 1024 batches, but we actually
> need 1M batches. Then we'd do the build in two phases - we'd generate
> 1024 batches, and then we'd split each of those batches into 1024
> smaller batches. The trick (as I understand it) is those batches can't
> overlap, so we'd not need more than 1024 batches, which greatly limits
> the memory consumption. We could even use a lower limit, derived from
> work_mem or something like that.

I think this is because we get the batch based on

*batchno = pg_rotate_right32(hashvalue, hashtable->log2_nbuckets) &
(nbatch - 1);

And tuples can only spill forward. I think Robert's example is if we
plan for 64 batches and eventually increase to 256 batches, a tuple
assigned to batch 1 could go to 65, 129, or 193 but no other batch --
meaning we would only need 3 files open when processing batch 1. But I
think we would need to do more explicit file flushing and closing and
opening, right? Which maybe doesn't matter when compared to the
overhead of so many more buffers.

> Of course, this is a more complex change than the "balancing" patch. But
> maybe not that much, not sure. For me the main disadvantage is it
> doesn't really help with the batch explosion for skewed data sets (or
> data with many hash collisions). It can easily happen we blindly
> increase nbatch until we use all the bits, and then break the work_mem
> limit anyway.
>
> But maybe there's a way to address that - the growthEnabled=false safety
> is an unreliable solution, because it requires the whole batch to fall
> to either of the new batches. A single tuple breaks that.
>
> What if we instead compared the two new batches, and instead looked at
> how far the split is from 1/2? And if it's very far from 1/2, we'd
> either increase work_mem (a bit like the balancing), or disable nbatch
> increases (maybe just temporarily).

Meaning like have some threshold for the number of tuples over the
limit we are? Right now, we decide to increase batches when we
encounter that one tuple that puts us over the limit. So, I could see
it making sense to decide with more foresight. Or we could even keep
track of the amount over the limit we are and increase the number of
batches once we hit that threshold.

This kind of seems like it would circle back to your algorithm for
deciding on the right tradeoff between hashtable size and number of
batches, though.

You could do something like this _and_ do something like close the
files that can't be the target of tuples from the current batch --
which would allow you to tolerate many more batch increases before
doubling the hashtable size is worth it. But it seems like the
algorithm to adapt the hashtable size based on the optimal tradeoff
between hashtable size and number of batches could be done first and
the patch to close files could be done later.

- Melanie



Re: Adjusting hash join memory limit to handle batch explosion

От
Tomas Vondra
Дата:

On 1/10/25 15:54, Melanie Plageman wrote:
> On Thu, Jan 9, 2025 at 6:59 PM Tomas Vondra <tomas@vondra.me> wrote:
>>
>>
>>
>> On 1/9/25 21:42, Melanie Plageman wrote:
>>>
>>> I was excited about your raw file experiment. As Robert and you point
>>> out -- we may need a file per batch, but for most of the hash join's
>>> execution we don't need to keep buffers for each batch around.
>>> However, given that the experiment didn't yield great results and we
>>> haven't come up with an alternative solution with sufficiently few
>>> flaws, I'm still in favor of 1.
>>>
>>
>> But I think those were two distinct proposals.
>>
>> My experiment with raw files keeps adding batches just like the current
>> code (so it might quickly explode to 1M batches) and then keep feeding
>> data to 1M files at the same time. This doesn't work, the buffering
>> clearly helps a lot, and it'd affect all hashjoins, even those with
>> fewer batches.
> 
> I see.
> 
>> Robert's idea kept using buffered files, but limited how many we can
>> fill at any phase. Say we'd use a limit of 1024 batches, but we actually
>> need 1M batches. Then we'd do the build in two phases - we'd generate
>> 1024 batches, and then we'd split each of those batches into 1024
>> smaller batches. The trick (as I understand it) is those batches can't
>> overlap, so we'd not need more than 1024 batches, which greatly limits
>> the memory consumption. We could even use a lower limit, derived from
>> work_mem or something like that.
> 
> I think this is because we get the batch based on
> 
> *batchno = pg_rotate_right32(hashvalue, hashtable->log2_nbuckets) &
> (nbatch - 1);
> 
> And tuples can only spill forward. I think Robert's example is if we
> plan for 64 batches and eventually increase to 256 batches, a tuple
> assigned to batch 1 could go to 65, 129, or 193 but no other batch --
> meaning we would only need 3 files open when processing batch 1.

Yes, I think that's why we only need 3 more files when splitting a
batch. The way I explain it is that going from 64 -> 256 adds 2 more
bits to the "batchno" part of the batch, and one of the patterns means
"current batch", so 3 new files.

This does remind me the "one spill file per slice" patch in [1],
although it approaches it from a different angle. My patch defined the
"slice" as batches we can keep in work_mem, while Robert proposed to
decide how many batches we can open (first level of batching), and then
maybe do that recursively if needed. That seems like a fundamentally
more sound approach (indeed, my patch can create too many slices).

[1]
https://www.postgresql.org/message-id/20190428141901.5dsbge2ka3rxmpk6%40development

> But I think we would need to do more explicit file flushing and
> closing and opening, right? Which maybe doesn't matter when compared
> to the overhead of so many more buffers.
Would it be all that much flushing and closing? Yes, we'd need to flush
and release the buffers (which I don't think BufFiles can do right now,
but let's ignore that for now). But I'd hope the batches are fairly
large (because that's why expect to generate them), so one more write
should not make a lot of difference on top of the actual bach split.
Chances are it's far cheaper than the extra memory pressure due to
keeping all batches in memory ...

I wonder if it might be a problem that those future batches are "far
apart". I mean, we're splitting batch 1, and the tuple may go to batches
+64, +128 and +192 batches ahead. If we allow larger "jumps" (e.g. from
256 to 64k batches), it'd be even more visible.

For the case of batch explosion I don't think it matters too much - it
will still explode into absurd number of batches, that doesn't change.
But that's fine, the point is to not cause OOM. Improving this case
would require increasing the work_mem limit (either directly or by
stopping the growth).

For regular cases I think the idea is the limit would be high enough to
not really hit this too often. I mean, how many real-world queries use
more than ~1024 batches? I don't think that's very common.

>> Of course, this is a more complex change than the "balancing" patch. But
>> maybe not that much, not sure. For me the main disadvantage is it
>> doesn't really help with the batch explosion for skewed data sets (or
>> data with many hash collisions). It can easily happen we blindly
>> increase nbatch until we use all the bits, and then break the work_mem
>> limit anyway.
>>
>> But maybe there's a way to address that - the growthEnabled=false safety
>> is an unreliable solution, because it requires the whole batch to fall
>> to either of the new batches. A single tuple breaks that.
>>
>> What if we instead compared the two new batches, and instead looked at
>> how far the split is from 1/2? And if it's very far from 1/2, we'd
>> either increase work_mem (a bit like the balancing), or disable nbatch
>> increases (maybe just temporarily).
> 
> Meaning like have some threshold for the number of tuples over the
> limit we are? Right now, we decide to increase batches when we
> encounter that one tuple that puts us over the limit. So, I could see
> it making sense to decide with more foresight. Or we could even keep
> track of the amount over the limit we are and increase the number of
> batches once we hit that threshold.
> 

Not sure I understand. I meant that we disable nbatch growth like this:

   if (nfreed == 0 || nfreed == ninmemory)
   {
       hashtable->growEnabled = false;
   }

which means that it only takes a single tuple that makes it to the other
batch to keep growing. But if 99.9999% tuples went to one of the
batches, increasing nbatch seems pretty futile.

But it goes in the opposite direction too. Imagine a uniform data set
with plenty of distinct values, but correlated / sorted, and each value
having more rows that can fit into a single batch. We'll immediately
disable growth, which is ... not great.

These are somewhat separate / independent issues, but I thin having a
concept of "retrying the nbatch growth after a while" would help.

> This kind of seems like it would circle back to your algorithm for
> deciding on the right tradeoff between hashtable size and number of
> batches, though.

Yes, it's about the same general idea, just expressed in a slightly
different way (the growing the work_mem part).

> 
> You could do something like this _and_ do something like close the
> files that can't be the target of tuples from the current batch --
> which would allow you to tolerate many more batch increases before
> doubling the hashtable size is worth it. But it seems like the
> algorithm to adapt the hashtable size based on the optimal tradeoff
> between hashtable size and number of batches could be done first and
> the patch to close files could be done later.
> 

Right. I don't think Robert's idea is a a complete answer, because it
does not consider the tradeoff that maybe increasing work_mem would be
better. OTOH maybe that's not something the hashjoin should worry about.
The goal is not to optimize the work_mem value, but make sure we don't
use significantly more memory ...

If hashjoin starts to optimize this, why shouldn't the other places
using work_mem do something similar?

regards

-- 
Tomas Vondra




Re: Adjusting hash join memory limit to handle batch explosion

От
Melanie Plageman
Дата:
On Fri, Jan 10, 2025 at 11:18 AM Tomas Vondra <tomas@vondra.me> wrote:
>
> On 1/10/25 15:54, Melanie Plageman wrote:
> > On Thu, Jan 9, 2025 at 6:59 PM Tomas Vondra <tomas@vondra.me> wrote:
> > I think this is because we get the batch based on
> >
> > *batchno = pg_rotate_right32(hashvalue, hashtable->log2_nbuckets) &
> > (nbatch - 1);
> >
> > And tuples can only spill forward. I think Robert's example is if we
> > plan for 64 batches and eventually increase to 256 batches, a tuple
> > assigned to batch 1 could go to 65, 129, or 193 but no other batch --
> > meaning we would only need 3 files open when processing batch 1.
>
> Yes, I think that's why we only need 3 more files when splitting a
> batch. The way I explain it is that going from 64 -> 256 adds 2 more
> bits to the "batchno" part of the batch, and one of the patterns means
> "current batch", so 3 new files.
>
> This does remind me the "one spill file per slice" patch in [1],
> although it approaches it from a different angle. My patch defined the
> "slice" as batches we can keep in work_mem, while Robert proposed to
> decide how many batches we can open (first level of batching), and then
> maybe do that recursively if needed. That seems like a fundamentally
> more sound approach (indeed, my patch can create too many slices).
>
> [1]
> https://www.postgresql.org/message-id/20190428141901.5dsbge2ka3rxmpk6%40development
>
> > But I think we would need to do more explicit file flushing and
> > closing and opening, right? Which maybe doesn't matter when compared
> > to the overhead of so many more buffers.
> Would it be all that much flushing and closing? Yes, we'd need to flush
> and release the buffers (which I don't think BufFiles can do right now,
> but let's ignore that for now). But I'd hope the batches are fairly
> large (because that's why expect to generate them), so one more write
> should not make a lot of difference on top of the actual bach split.
> Chances are it's far cheaper than the extra memory pressure due to
> keeping all batches in memory ...
>
> I wonder if it might be a problem that those future batches are "far
> apart". I mean, we're splitting batch 1, and the tuple may go to batches
> +64, +128 and +192 batches ahead. If we allow larger "jumps" (e.g. from
> 256 to 64k batches), it'd be even more visible.

I don't follow. Why would it be a problem if tuples have to go to
batches that are far away in number?

> >> Of course, this is a more complex change than the "balancing" patch. But
> >> maybe not that much, not sure. For me the main disadvantage is it
> >> doesn't really help with the batch explosion for skewed data sets (or
> >> data with many hash collisions). It can easily happen we blindly
> >> increase nbatch until we use all the bits, and then break the work_mem
> >> limit anyway.
> >>
> >> But maybe there's a way to address that - the growthEnabled=false safety
> >> is an unreliable solution, because it requires the whole batch to fall
> >> to either of the new batches. A single tuple breaks that.
> >>
> >> What if we instead compared the two new batches, and instead looked at
> >> how far the split is from 1/2? And if it's very far from 1/2, we'd
> >> either increase work_mem (a bit like the balancing), or disable nbatch
> >> increases (maybe just temporarily).
> >
> > Meaning like have some threshold for the number of tuples over the
> > limit we are? Right now, we decide to increase batches when we
> > encounter that one tuple that puts us over the limit. So, I could see
> > it making sense to decide with more foresight. Or we could even keep
> > track of the amount over the limit we are and increase the number of
> > batches once we hit that threshold.
> >
>
> Not sure I understand. I meant that we disable nbatch growth like this:
>
>    if (nfreed == 0 || nfreed == ninmemory)
>    {
>        hashtable->growEnabled = false;
>    }

Ah, right. I was thinking of the wrong thing.

> which means that it only takes a single tuple that makes it to the other
> batch to keep growing. But if 99.9999% tuples went to one of the
> batches, increasing nbatch seems pretty futile.

Right. Yes, that is unfortunate. You could do a percentage threshold.
Or if we knew how big the biggest batch is, we could decide whether or
not to disable growth based on the size the hashtable would be for
that batch vs the overhead of another doubling of nbatches.

> But it goes in the opposite direction too. Imagine a uniform data set
> with plenty of distinct values, but correlated / sorted, and each value
> having more rows that can fit into a single batch. We'll immediately
> disable growth, which is ... not great.
>
> These are somewhat separate / independent issues, but I thin having a
> concept of "retrying the nbatch growth after a while" would help.

Yes, I think retrying nbatch growth later makes sense in this case. Or
when doubling nbatches wouldn't help split one rogue batch but would
help other big batches.

> > You could do something like this _and_ do something like close the
> > files that can't be the target of tuples from the current batch --
> > which would allow you to tolerate many more batch increases before
> > doubling the hashtable size is worth it. But it seems like the
> > algorithm to adapt the hashtable size based on the optimal tradeoff
> > between hashtable size and number of batches could be done first and
> > the patch to close files could be done later.
>
> Right. I don't think Robert's idea is a a complete answer, because it
> does not consider the tradeoff that maybe increasing work_mem would be
> better. OTOH maybe that's not something the hashjoin should worry about.
> The goal is not to optimize the work_mem value, but make sure we don't
> use significantly more memory ...

Well it's also not a complete solution because it doesn't solve the
hash collision/batch explosion case.

> If hashjoin starts to optimize this, why shouldn't the other places
> using work_mem do something similar?

Yes, I suppose other spilling operators (like hashagg) that use
buffered files may consider doing this. But I don't think that is a
reason not to use this particular strategy to "fix" this hash join
batch explosion issue.

You could make the argument that because it is the buffers and not the
actual number of batches that is the problem, that we should fix it by
closing the files that aren't being used while processing a batch.
But I really like how small and isolated your sizing balance patch is.
And I actually think that the fact that it could be used to optimize
this tradeoff (work_mem/file buffers) in other places is good. Anyway,
my point was just that we could do both -- likely in any order.

- Melanie



Re: Adjusting hash join memory limit to handle batch explosion

От
Tomas Vondra
Дата:

On 1/11/25 00:09, Melanie Plageman wrote:
> On Fri, Jan 10, 2025 at 11:18 AM Tomas Vondra <tomas@vondra.me> wrote:
>>
>> On 1/10/25 15:54, Melanie Plageman wrote:
>>> On Thu, Jan 9, 2025 at 6:59 PM Tomas Vondra <tomas@vondra.me> wrote:
>>> I think this is because we get the batch based on
>>>
>>> *batchno = pg_rotate_right32(hashvalue, hashtable->log2_nbuckets) &
>>> (nbatch - 1);
>>>
>>> And tuples can only spill forward. I think Robert's example is if we
>>> plan for 64 batches and eventually increase to 256 batches, a tuple
>>> assigned to batch 1 could go to 65, 129, or 193 but no other batch --
>>> meaning we would only need 3 files open when processing batch 1.
>>
>> Yes, I think that's why we only need 3 more files when splitting a
>> batch. The way I explain it is that going from 64 -> 256 adds 2 more
>> bits to the "batchno" part of the batch, and one of the patterns means
>> "current batch", so 3 new files.
>>
>> This does remind me the "one spill file per slice" patch in [1],
>> although it approaches it from a different angle. My patch defined the
>> "slice" as batches we can keep in work_mem, while Robert proposed to
>> decide how many batches we can open (first level of batching), and then
>> maybe do that recursively if needed. That seems like a fundamentally
>> more sound approach (indeed, my patch can create too many slices).
>>
>> [1]
>> https://www.postgresql.org/message-id/20190428141901.5dsbge2ka3rxmpk6%40development
>>
>>> But I think we would need to do more explicit file flushing and
>>> closing and opening, right? Which maybe doesn't matter when compared
>>> to the overhead of so many more buffers.
>> Would it be all that much flushing and closing? Yes, we'd need to flush
>> and release the buffers (which I don't think BufFiles can do right now,
>> but let's ignore that for now). But I'd hope the batches are fairly
>> large (because that's why expect to generate them), so one more write
>> should not make a lot of difference on top of the actual bach split.
>> Chances are it's far cheaper than the extra memory pressure due to
>> keeping all batches in memory ...
>>
>> I wonder if it might be a problem that those future batches are "far
>> apart". I mean, we're splitting batch 1, and the tuple may go to batches
>> +64, +128 and +192 batches ahead. If we allow larger "jumps" (e.g. from
>> 256 to 64k batches), it'd be even more visible.
> 
> I don't follow. Why would it be a problem if tuples have to go to
> batches that are far away in number?
> 

I think you're right it's not a problem, I was just thinking aloud (or
whatever you do in an e-mail).

>>>> Of course, this is a more complex change than the "balancing" patch. But
>>>> maybe not that much, not sure. For me the main disadvantage is it
>>>> doesn't really help with the batch explosion for skewed data sets (or
>>>> data with many hash collisions). It can easily happen we blindly
>>>> increase nbatch until we use all the bits, and then break the work_mem
>>>> limit anyway.
>>>>
>>>> But maybe there's a way to address that - the growthEnabled=false safety
>>>> is an unreliable solution, because it requires the whole batch to fall
>>>> to either of the new batches. A single tuple breaks that.
>>>>
>>>> What if we instead compared the two new batches, and instead looked at
>>>> how far the split is from 1/2? And if it's very far from 1/2, we'd
>>>> either increase work_mem (a bit like the balancing), or disable nbatch
>>>> increases (maybe just temporarily).
>>>
>>> Meaning like have some threshold for the number of tuples over the
>>> limit we are? Right now, we decide to increase batches when we
>>> encounter that one tuple that puts us over the limit. So, I could see
>>> it making sense to decide with more foresight. Or we could even keep
>>> track of the amount over the limit we are and increase the number of
>>> batches once we hit that threshold.
>>>
>>
>> Not sure I understand. I meant that we disable nbatch growth like this:
>>
>>    if (nfreed == 0 || nfreed == ninmemory)
>>    {
>>        hashtable->growEnabled = false;
>>    }
> 
> Ah, right. I was thinking of the wrong thing.
> 
>> which means that it only takes a single tuple that makes it to the other
>> batch to keep growing. But if 99.9999% tuples went to one of the
>> batches, increasing nbatch seems pretty futile.
> 
> Right. Yes, that is unfortunate. You could do a percentage threshold.
> Or if we knew how big the biggest batch is, we could decide whether or
> not to disable growth based on the size the hashtable would be for
> that batch vs the overhead of another doubling of nbatches.
> 

I was thinking it might be possible to express this as a formula similar
to the "balancing". I mean, something that says "just double as you
wish" when the current doubling split the batch 50:50, but delays the
next doubling if the batch gets split 99:1 (with some continuous
transition between those two extremes).

Or maybe this could also drive increasing the memory limit. Yes, there's
a chance that the next doubling will split it more evenly, but I think
it's much more likely there really are hash collisions of some sort.


>> But it goes in the opposite direction too. Imagine a uniform data set
>> with plenty of distinct values, but correlated / sorted, and each value
>> having more rows that can fit into a single batch. We'll immediately
>> disable growth, which is ... not great.
>>
>> These are somewhat separate / independent issues, but I thin having a
>> concept of "retrying the nbatch growth after a while" would help.
> 
> Yes, I think retrying nbatch growth later makes sense in this case. Or
> when doubling nbatches wouldn't help split one rogue batch but would
> help other big batches.
> 

Exactly. Giving up the growth entirely seems a bit premature. I don't
think there's a principled formula to determine when to retry, but it
might be enough to try after the hash table doubles in size. That's
pretty much the "let's increase work_mem a bit" I mentioned above.

>>> You could do something like this _and_ do something like close the
>>> files that can't be the target of tuples from the current batch --
>>> which would allow you to tolerate many more batch increases before
>>> doubling the hashtable size is worth it. But it seems like the
>>> algorithm to adapt the hashtable size based on the optimal tradeoff
>>> between hashtable size and number of batches could be done first and
>>> the patch to close files could be done later.
>>
>> Right. I don't think Robert's idea is a a complete answer, because it
>> does not consider the tradeoff that maybe increasing work_mem would be
>> better. OTOH maybe that's not something the hashjoin should worry about.
>> The goal is not to optimize the work_mem value, but make sure we don't
>> use significantly more memory ...
> 
> Well it's also not a complete solution because it doesn't solve the
> hash collision/batch explosion case.
> 

I think it does, in a way. It doesn't prevent the batch explosion, of
course, it still ends with millions of batches. But it's not keeping all
the BufFiles open at the same time, so it does not use the insane
amounts of memory. And it's slower than the balancing, of course.

>> If hashjoin starts to optimize this, why shouldn't the other places
>> using work_mem do something similar?
> 
> Yes, I suppose other spilling operators (like hashagg) that use
> buffered files may consider doing this. But I don't think that is a
> reason not to use this particular strategy to "fix" this hash join
> batch explosion issue.
> 
> You could make the argument that because it is the buffers and not the
> actual number of batches that is the problem, that we should fix it by
> closing the files that aren't being used while processing a batch.
> But I really like how small and isolated your sizing balance patch is.
> And I actually think that the fact that it could be used to optimize
> this tradeoff (work_mem/file buffers) in other places is good. Anyway,
> my point was just that we could do both -- likely in any order.
> 

Right. The way I'm looking at this is the balancing patch is a strict
improvement over the current state. Robert's proposal is more principled
in that it actually tries to enforce the promised memory limit.


regards

-- 
Tomas Vondra




Re: Adjusting hash join memory limit to handle batch explosion

От
Melanie Plageman
Дата:
On Sat, Jan 11, 2025 at 7:42 PM Tomas Vondra <tomas@vondra.me> wrote:
>
> I had a quiet evening yesterday, so I decided to take a stab at this and
> see how hard would it be, and how bad would the impact be. Attached is
> an experimental patch, doing the *bare* minimum for a simple query:
>
> 1) It defines a limit of 128 batches (a bit low, but also 1MB). In
> practice we'd use something like 256 - 1024, probably. Doesn't matter.
>
> 2) Ensures the initial pass over data in MultiExecPrivateHash does not
> use more than 128 batches, switches to "tooManyBatches=true" if that
> happens (and dumps the batch to file ExecHashDumpBatchToFile, even if
> it's batchno=0). And later it calls ExecHashHandleTooManyBatches() to
> increase the nbatch further.
>
> 3) Does something similar for the outer relation - if there are too many
> batches, we do ExecHashJoinRepartitionBatches() which first partitions
> into 128 batches. This only does a single pass in the WIP, though.
> Should be recursive or something.
>
> 4) Extends the BufFile API with BufFileHasBuffer/BufFileFreeBuffer so
> that the code can free the buffers. It also means the buffer needs to be
> allocated separately, not embedded in BufFile struct. (I'm a bit
> surprised it works without having to re-read the buffer after freeing
> it, but that's probably thanks to how hashjoin uses the files).

I started looking at this. Even though you do explain what it does
above, I still found it a bit hard to follow. Could you walk through
an example (like the one you gave in SQL) but explaining what happens
in the implementation? Basically what you have in 2 and 3 above but
with a specific example.

This is my understanding of what this does:
if we are at the max number of batches when building the hashtable and
we run out of space and need to double nbatches, we
1. dump the data from the current batch that is in the hashtable into a file
2. close and flush are the currently open buffiles, double the number
of batches, and then only open files for the batches we need to store
tuples from the batch we were trying to put in the hashtable when we
hit the limit (now in a temp file)

I also don't understand why ExecHashJoinRepartitionBatches() is needed
-- I think it has something to do with needing a certain number of
buffers open while processing batch 0, but what does this have to do
with the outer side of the join?

Another random question: why doesn't ExecHashHandleTooManyBatches()
free the outer batch files?

- Melanie



Re: Adjusting hash join memory limit to handle batch explosion

От
Tomas Vondra
Дата:
On 1/13/25 17:32, Melanie Plageman wrote:
> On Sat, Jan 11, 2025 at 7:42 PM Tomas Vondra <tomas@vondra.me> wrote:
>>
>> I had a quiet evening yesterday, so I decided to take a stab at this and
>> see how hard would it be, and how bad would the impact be. Attached is
>> an experimental patch, doing the *bare* minimum for a simple query:
>>
>> 1) It defines a limit of 128 batches (a bit low, but also 1MB). In
>> practice we'd use something like 256 - 1024, probably. Doesn't matter.
>>
>> 2) Ensures the initial pass over data in MultiExecPrivateHash does not
>> use more than 128 batches, switches to "tooManyBatches=true" if that
>> happens (and dumps the batch to file ExecHashDumpBatchToFile, even if
>> it's batchno=0). And later it calls ExecHashHandleTooManyBatches() to
>> increase the nbatch further.
>>
>> 3) Does something similar for the outer relation - if there are too many
>> batches, we do ExecHashJoinRepartitionBatches() which first partitions
>> into 128 batches. This only does a single pass in the WIP, though.
>> Should be recursive or something.
>>
>> 4) Extends the BufFile API with BufFileHasBuffer/BufFileFreeBuffer so
>> that the code can free the buffers. It also means the buffer needs to be
>> allocated separately, not embedded in BufFile struct. (I'm a bit
>> surprised it works without having to re-read the buffer after freeing
>> it, but that's probably thanks to how hashjoin uses the files).
> 
> I started looking at this. Even though you do explain what it does
> above, I still found it a bit hard to follow. Could you walk through
> an example (like the one you gave in SQL) but explaining what happens
> in the implementation? Basically what you have in 2 and 3 above but
> with a specific example.
> 

OK, I'll try ... see the end of this message.

> This is my understanding of what this does:
> if we are at the max number of batches when building the hashtable and
> we run out of space and need to double nbatches, we
> 1. dump the data from the current batch that is in the hashtable into a file
> 2. close and flush are the currently open buffiles, double the number
> of batches, and then only open files for the batches we need to store
> tuples from the batch we were trying to put in the hashtable when we
> hit the limit (now in a temp file)
> 

Roughly, but the second step needs to happen only after we finish the
first pass over the inner relation. I'll try to explain this as part of
the example.

> I also don't understand why ExecHashJoinRepartitionBatches() is needed
> -- I think it has something to do with needing a certain number of
> buffers open while processing batch 0, but what does this have to do
> with the outer side of the join?
> 

No, this is about building batches on the outer side. We've built the
hash table, and we may have ended with a very high nbatch. We can't
build all of them right away (would need too many buffiles), so we do
that in multiple phases, to not cross the limit.

> Another random question: why doesn't ExecHashHandleTooManyBatches()
> free the outer batch files?
> 

Because it was tailored for the example when all batch splits happen for
batch 0, before we even start processing the outer side. In practice it
probably should free the files.

Let's do the example - as I mentioned, I only tried doing this for the
case where all the batch increases happen for batch 0, before we start
building the outer batches. I'm 99% sure the patch will need to modify a
couple more places to handle batch increases in later stages.

Assume we don't want to use more than 128 batches, but that we're
running a query that needs 256 batches. The patch will do this:

1) ExecHashTableCreate will set nbatch_maximum=128 as the limit for the
current pass over inner relation, and it'll cap the other nbatch fields
accordingly. If we already know we'll need more batches, we set
tooManyBatches=true to remember this.

But let's we start with nbatch=64, nbatch_maximum=128 (and thus also
with tooManyBatches=false).

2) We start loading data into the hash table, until exceed the memory
limit for the first time. We double the number to 128, move some of the
data from the hash table to the new batch, and continue.

3) We hit the memory limit again, but this time we've hit

    (nbatch == nbatch_maximum)

so we can't double the number of batches. But we also can't continue
adding data to the in-memory hash table, so we set tooManyBatches=true
and we start spilling even the current batch to a file.

4) We finish the first pass over the inner relation with

    nbatch = 128
    nbatch_maximum = 128
    tooManyBatches = true

so we need to do something. We run ExecHashHandleTooManyBatches() starts
increasing the nbatches until the current batch fits into work_mem. We
have nbatch=128, and the query needs nbatch=256, so we only do one loop.

Note: Right now it simply doubles the number of batches in each loop.
But it could be faster and do up to 128 in one step.

    128 -> 16k -> 1M

The later batches will already do all the increases in a single step,
that needs an improvement too.

4) After ExecHashHandleTooManyBatches completed, we have the inner side
of the batch mostly "done". We have nbatch=256.

5) We start building batches on the outer side, but we also don't want
to build all the batches at once - we want to build 128 and only then go
to 256 (or further). This is what ExecHashJoinRepartitionBatches does.

If we have too many batches for one pass, we build 128 batches in the
first pass. And then we just read the batch files, doing further splits.
Right now this just does a single pass and thus splits the relation into
128 batches, and then just continues as before. That's enough for 256
batches, because 256 is a single step past 128.

But it really should be recursive / do multiple passes, to handle more
cases with more than 16k batches (although with higher limit it would be
less of an issue).


5) It does free the file buffers in various places. Chances are some of
those places are unnecessary, and it should be done in some more places.


As I said, I don't claim this to handle all cases, especially with
splits in later batches.

Does this make it clearer?


regards

-- 
Tomas Vondra