1: 379c12b5d26 < -: ----------- oauth: Remove stale events from the kqueue multiplexer -: ----------- > 1: c5cdccfe374 oauth: Remove stale events from the kqueue multiplexer -: ----------- > 2: 7725e0c173b oauth: Ensure unused socket registrations are removed 2: f30317d7265 ! 3: 6ccf7a5d156 oauth: Remove expired timers from the multiplexer @@ src/interfaces/libpq-oauth/oauth-curl.c: pg_fe_run_oauth_flow_impl(PGconn *conn) + * edge-triggered) timeouts, and ours are level-triggered + * via the mux. + * -+ * This can't be combined with the drain_socket_events() -+ * call below: we might accidentally clear a short timeout -+ * that was both set and expired during the call to ++ * This can't be combined with the comb_multiplexer() call ++ * below: we might accidentally clear a short timeout that ++ * was both set and expired during the call to + * drive_request(). + */ + if (!drain_timer_events(actx, NULL)) 3: d243d28964d ! 4: 2be993b8f07 oauth: Track total call count during a client flow @@ Metadata ## Commit message ## oauth: Track total call count during a client flow - Tracking down the bugs that led to the addition of drain_socket_events() + Tracking down the bugs that led to the addition of comb_multiplexer() and drain_timer_events() was difficult, because an inefficient flow is not visibly different from one that is working properly. To help maintainers notice when something has gone wrong, track the number of @@ src/interfaces/libpq-oauth/oauth-curl.c: struct async_ctx bool used_basic_auth; /* did we send a client secret? */ bool debugging; /* can we give unsafe developer assistance? */ + int dbg_num_calls; /* (debug mode) how many times were we called? */ + }; - #if defined(HAVE_SYS_EVENT_H) - int nevents; /* how many events are we waiting on? */ + /* @@ src/interfaces/libpq-oauth/oauth-curl.c: PostgresPollingStatusType pg_fe_run_oauth_flow(PGconn *conn) { @@ src/interfaces/libpq-oauth/oauth-curl.c: pg_fe_run_oauth_flow(PGconn *conn) result = pg_fe_run_oauth_flow_impl(conn); + /* -+ * To assist with finding bugs in drain_socket_events() and ++ * To assist with finding bugs in comb_multiplexer() and + * drain_timer_events(), when we're in debug mode, track the total number + * of calls to this function and print that at the end of the flow. + * 4: ca6fd237653 ! 5: 50257bf32eb oauth: Add unit tests for multiplexer handling @@ src/interfaces/libpq-oauth/test-oauth-curl.c (new) + + /* + * Draining the pipe should unset the multiplexer again, once the old -+ * event is drained. ++ * event is cleared. + */ + Assert(drain_pipe(rfd, 1)); -+ Assert(drain_socket_events(actx)); ++ Assert(comb_multiplexer(actx)); + mux_is_not_ready(actx->mux, "when fd is drained"); + + /* Undo any unidirectional emulation. */ @@ src/interfaces/libpq-oauth/test-oauth-curl.c (new) + ssize_t written; + + /* -+ * Fill the pipe. Once the old writable event is drained, the mux ++ * Fill the pipe. Once the old writable event is cleared, the mux + * should not be ready. + */ + Assert((written = fill_pipe(wfd)) > 0); + printf("# pipe buffer is full at %zd bytes\n", written); + -+ Assert(drain_socket_events(actx)); ++ Assert(comb_multiplexer(actx)); + mux_is_not_ready(actx->mux, "when fd buffer is full"); + + /* Drain the pipe again. */ @@ src/interfaces/libpq-oauth/test-oauth-curl.c (new) + + /* Make sure an expired timer doesn't interfere with event draining. */ + { ++ bool expired; ++ + /* Make the rfd appear unidirectional if necessary. */ + if (bidirectional) + Assert((bidi_pipe_size = fill_pipe(rfd)) > 0); @@ src/interfaces/libpq-oauth/test-oauth-curl.c (new) + * Draining the pipe should unset the multiplexer again, once the + * old event is drained and the timer is reset. + * -+ * Order matters to avoid false negatives. First drain the socket, -+ * then unset the timer. We're trying to catch the case where the -+ * pending timer expiration event takes the place of one of the -+ * socket events we're attempting to drain. ++ * Order matters, since comb_multiplexer() doesn't have to remove ++ * stale events when active events exist. Follow the call sequence ++ * used in the code: drain the timer expiration, drain the pipe, ++ * then clear the stale events. + */ ++ Assert(drain_timer_events(actx, &expired)); + Assert(drain_pipe(rfd, 1)); -+ Assert(drain_socket_events(actx)); -+ Assert(set_timer(actx, -1)); ++ Assert(comb_multiplexer(actx)); + ++ is(expired, 1, "drain_timer_events() reports expiration"); + is(timer_expired(actx), 0, "timer is no longer expired"); + mux_is_not_ready(actx->mux, "when fd is drained and timer reset"); + @@ src/interfaces/libpq-oauth/test-oauth-curl.c (new) + if (bidirectional) + Assert(drain_pipe(wfd, bidi_pipe_size)); + } ++ ++ /* Ensure comb_multiplexer() can handle multiple stale events. */ ++ { ++ int rfd2, ++ wfd2; ++ ++ /* Create a second local pipe. */ ++ Assert(pipe(pipefd) == 0); ++ rfd2 = pipefd[0]; ++ wfd2 = pipefd[1]; ++ ++ /* Make both rfds appear unidirectional if necessary. */ ++ if (bidirectional) ++ { ++ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0); ++ Assert(fill_pipe(rfd2) == bidi_pipe_size); ++ } ++ ++ /* Register for read events on both fds, and make them readable. */ ++ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0); ++ Assert(register_socket(NULL, rfd2, in_event, actx, NULL) == 0); ++ ++ Assert(write(wfd, "x", 1) == 1); ++ Assert(write(wfd2, "x", 1) == 1); ++ ++ mux_is_ready(actx->mux, deadline, "when two fds are readable"); ++ ++ /* ++ * Drain both fds. comb_multiplexer() should then ensure that the ++ * mux is no longer readable. ++ */ ++ Assert(drain_pipe(rfd, 1)); ++ Assert(drain_pipe(rfd2, 1)); ++ Assert(comb_multiplexer(actx)); ++ mux_is_not_ready(actx->mux, "when two fds are drained"); ++ ++ /* Stop listening. */ ++ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0); ++ Assert(register_socket(NULL, rfd2, CURL_POLL_REMOVE, actx, NULL) == 0); ++ ++ /* Undo any unidirectional emulation. */ ++ if (bidirectional) ++ { ++ Assert(drain_pipe(wfd, bidi_pipe_size)); ++ Assert(drain_pipe(wfd2, bidi_pipe_size)); ++ } ++ ++ close(rfd2); ++ close(wfd2); ++ } + } + + close(rfd);