7.4. Silk

7.4.1. Концепция

Silk (Shardman InterLinK) — это экспериментальная транспортная функциональность. Она подключается в тот момент, когда postgres_fdw решает передать восстановленный фрагмент запроса через соединение libpq на удалённый узел, заменяя собой соединение libpq. Эта функциональность предназначена для уменьшения количества незанятых подключений postgres_fdw во время выполнения транзакции, минимизации задержки и повышения общей пропускной способности.

Реализация Silk использует несколько обслуживающих процессов. Основной процесс маршрутизации/мультиплексирования (по одному на экземпляр PostgreSQL) называется silkroad, а группа обслуживающих процессов — silkworms. Хотя postgres_fdw использует libpq, оно порождает несколько подключений libpq от каждого обслуживающего процесса к удалённому узлу (где соответственно порождаются несколько обслуживающих процессов). Но если silk заменяет libpq, каждый процесс silkroad подключается только к одному удалённому процессу silkroad. В этой схеме удалённые silkworms играют роль удалённых обслуживающих процессов, в противном случае порождённых postgres_fdw.

Silkroad связывает локальный обслуживающий процесс с рабочими процессами удалённого узла следующим образом:

  1. Обслуживающий процесс использует обычный API postgres_fdw для доступа к удалённым данным. Но postgres_fdw, когда включён silk, записывает запрос в очередь общей памяти вместо подключения libpq;

  2. Процесс Silkroad анализирует входящую очередь общей памяти от этого обслуживающего процесса и направляет сообщение соответствующему сетевому подключению с удалённым процессом silkroad.

  3. Удалённый процесс silkroad захватывает входящее сообщение из сети и (если оно новое) перенаправляет его в доступную очередь общей памяти рабочего процесса (или в специальную очередь «неназначенных заданий», если все рабочие процессы заняты) .

  4. Наконец, удалённый рабочий процесс получает сообщение через свою очередь общей памяти, выполняет его и отправляет обратно получившиеся кортежи (или ошибку) таким же образом.

Silkroad действует здесь как обычный сетевой коммутатор, перебрасывая пакеты между общей памятью серверной части и соответствующим сетевым сокетом. Этот процесс ничего не знает о содержании сообщения, полагаясь только на его заголовок.

7.4.2. Цикл событий

Процесс Silkroad запускает цикл событий на базе библиотеки libev. Очередь общей памяти каждого обслуживающего процесса отображается в цикле обработки событий с помощью дескриптора eventfd, а каждое сетевое подключение — с помощью дескриптора сокета.

Во время запуска обслуживающий процесс регистрирует себя (свои дескрипторы eventfd) в локальном процессе silkroad. Silkroad отвечает, указывая, какие сегменты памяти использовать для очереди сообщений обслуживающего процесса. С этого момента silkroad будет реагировать на события из очереди, связанной с этим обслуживающим процессом. Сетевые подключения между локальными и удалёнными silkroads будут установлены сразу по первому запросу от обслуживающего процесса к удалённому узлу и будут существовать до тех пор, пока оба участника (процессы silkroad) существуют.

7.4.3. Маршрутизация и мультиплексирование

Для каждого подзапроса ожидается подмножество кортежей и поэтому взаимодействие внутри подзапроса представляется как двунаправленный поток данных. Silkroad использует внутреннюю таблицу маршрутизации для регистрации этих потоков. Уникальный идентификатор потока (в кластере Shardman) формируется как пара «адрес исходного узла, адрес целевого узла» и локально (внутри узла) уникальный номер. Каждый конкретный подзапрос от обслуживающего процесса к удалённым узлам будет зарегистрирован silkroad как такой поток. Таким образом, любой обслуживающий процесс может быть связан со многими потоками одновременно.

Когда локальный процесс silkroad получает от серверной части сообщение с новым идентификатором потока, он регистрирует его в локальной таблице маршрутизации, а затем перенаправляет это сообщение в соответствующий сокет. Если подключение с удалённым silkroad не существует, оно устанавливается с помощью процедуры согласования. Исходное сообщение, инициировавшее согласование, помещается в специальный внутренний буфер до тех пор, пока согласование не завершится успешно. Удалённый процесс silkroad, получив пакет с новым идентификатором, регистрирует его в своей таблице, затем назначает рабочий процесс silkworm из пула доступных рабочих процессов и помещает сообщение в очередь общей памяти рабочего процесса. Если все рабочие процессы silkworm в данный момент заняты, сообщение будет отложено, т. е. помещено в специальную «очередь неназначенных заданий» (обратите внимание, что значение параметра конфигурации shardman.silk_unassigned_job_queue_size равно 1024). Если в очереди нет свободного места, будет сгенерировано сообщение об ошибке, которое будет отправлено обратно исходному обслуживающему процессу. Задание из этой очереди будет позже назначено первому доступному рабочему процессу, когда он освободится от предыдущего задания.

Когда рабочий процесс получает новое «задание», он выполняет его через подсистему SPI, организует получившиеся кортежи в пакеты и отправляет их обратно через общую память локальному процессу silkroad. Остальное тривиально, потому что весь маршрут известен. Последний получившийся пакет с кортежами в потоке помечается как «закрывающий». Это приказ процессам silkroads стереть этот маршрут из своих таблиц.

Обратите внимание, что серверные и удалённые рабочие процессы остаются «подписанными» на свои потоки, пока они не будут явно закрыты. Таким образом, сервер имеет возможность отправить сообщение о прерывании или уведомить удалённый рабочий процесс о преждевременном закрытии транзакции. Это позволяет отбрасывать устаревшие пакеты данных, возможно оставшиеся от предыдущих прерванных транзакций.

Для просмотра текущего состояния процесса мультиплексора silkroad используются диагностические представления Silk, описанные в разделе Подраздел 6.3.3.

7.4.4. Обработка ошибок и целостность маршрута

Кроме того, таблица маршрутизации silkroad отслеживает конечные точки (серверы и сетевые подключения), которые были задействованы в том или ином потоке. Таким образом, когда какое-либо подключение закрывается, все задействованные серверы (и/или рабочие процессы) будут уведомлены об этом событии специальным сообщением об ошибке, а все маршруты/потоки, связанные с этим подключением, будут отклонены. Точно так же, если происходит сбой сервера, его очередь общей памяти отсоединяется, и silkroad реагирует, отправляя сообщения об ошибках удалённым участникам каждого потока, связанного со сбоем сервера. Таким образом, удалённые рабочие процессы выполняют бесполезную работу, когда инициатор запроса уже неактивен.

7.4.5. Передача/порционирование/разделение данных в сверхбольших кортежах

Получившиеся кортежи передаются silkworm в собственном двоичном режиме. Кортежи с атрибутом хранения external будут удалены из TOAST, но те, которые были сжаты, останутся сжатыми.

Небольшие кортежи будут организованы в порции (примерно по 256 тыс.). Большие кортежи будут разделены на части отправителем и собраны в единое целое принимающим сервером.

7.4.6. Управление потоками

Может возникнуть ситуация, при которой очередное сообщение, полученное от сервера, не помещается в целевом сетевом буфере. Или сообщение, полученное из сети, не помещается в целевую очередь общей памяти. В таком случае поток, вызвавший эту ситуацию, будет «приостановлен». Это означает, что silkroad приостанавливает реакцию на события исходной конечной точки (подключение или сервер) до тех пор, пока целевая конечная точка не обработает все свои сообщения. Остальные серверы и подключения, не затронутые этим маршрутом, продолжают работать. Принимающие модули серверов призваны свести к минимуму подобные ситуации. Сервер периодически проверяет и очищает входящую очередь, даже когда исполнитель плана занят обработкой других узлов плана. Полученные кортежи хранятся в хранилищах кортежей сервера в соответствии с узлами плана до тех пор, пока исполнитель не запросит следующий кортеж для выполнения конкретного узла плана.

Когда в целевой очереди освобождается достаточно места, приостановленный поток возобновляется, события конечной точки разблокируются, а процесс приёма и сортировки пакетов продолжается.

7.4.7. Тонкости реализации

7.4.7.1. Передача состояний и CSN

Когда postgres_fdw работает через транспорт Silk, используется только одно подключение между демонами маршрутизации silkroad для передачи пользовательских запросов рабочим процессам silkworm и получения их ответов. Каждый запрос содержит состояние транзакции, идентификатор группы репликации узла, на котором формируется запрос (координатора), сам запрос и параметры запроса (если они есть). Ответ представляет собой либо сообщение об ошибке с описанием и кодом ошибки, либо группу кортежей, за которыми следует сообщение «end of tuples» (конец кортежей). Это означает, что silkworm должен переключиться на состояние транзакции, полученное вместе с запросом, перед выполнением запроса.

На данный момент транспорт Silk используется исключительно для запросов SELECT только для чтения. Все запросы на изменение обрабатываются через обычное подключение libpq и в основном обрабатываются как все остальные DML-запросы в PostgreSQL postgres_fdw. Единственное отличие состоит в том, что когда DML-запрос обрабатывается postgres_fdw, модуль сбрасывает сохранённое состояние транзакции для записи кеша соединений, соответствующей соединению, в котором отправляется этот запрос. Кроме того, для такой записи в кеше соединений устанавливается флаг только для чтения. Когда запрос отправляется через транспорт Silk, расширение Shardman запрашивает состояние транзакции для пары идентификаторов сервера и пользователя из postgres_fdw. Если такая запись найдена в кеше соединений postgres_fdw, она не является записью кеша только для чтения и в этой записи присутствует состояние транзакции, состояние возвращается. Если его нет, postgres_fdw получает полное состояние транзакции с удалённого сервера, сохраняет его в записи кеша соединений и возвращает в расширение Shardman.

Полное состояние транзакции аналогично состоянию параллельной рабочей транзакции и содержит:

  • информацию, относящуюся к текущему пользователю (uid, имя пользователя)

  • идентификатор процесса текущего сервера

  • временную метку начала транзакции

  • CSN текущего снимка

  • флаги, указывающие на наличие событий аннулирования

  • внутреннее состояние сервера:

    • массив ComboCIDs

    • внутреннее состояние транзакции (полный идентификатор транзакции, уровень изоляции, текущий идентификатор команды и т. д.)

    • информацию о переиндексированных индексах

Если подключение не найдено в кеше подключений postgres_fdw (т. е. это новое подключение) или запись в кеше подключения помечается как доступная только для чтения, только эти характеристики формируют состояние транзакции:

  • информация, относящаяся к текущему пользователю (имя пользователя)

  • временную метку начала транзакции

  • CSN текущего снимка

  • флаги, указывающие на наличие событий аннулирования

Используя такие состояния транзакций, silkworm может присоединиться к выполняющейся транзакции или начать новую транзакцию только для чтения с предоставленным снимком CSN и получить результат.

Обратите внимание, что полное состояние транзакции можно импортировать только на тот сервер, который его экспортировал, а также, что из-за этого метода передачи состояния транзакции вы не можете использовать транспорт Silk без включения снимков CSN.

7.4.7.2. Интеграция с асинхронным механизмом FDW

В Подразделе 7.2.2 асинхронные узлы плана ForeignScan были представлены как способ оптимизации извлечения данных с нескольких узлов, если эти узлы плана были расположены под одним узлом Append. В стандартной архитектуре PostgreSQL выполнение планов узла ForeignScan реализовано с использованием сетевого протокола на основе libpq. Чтобы повысить производительность системы при передаче данных и снизить потребление ресурсов, Shardman использует другой метод обмена данными с удалёнными узлами. Механизм выполнения планов узла ForeignScan реализован с использованием протокола Silk.

Чтобы включить транспорт Silk в асинхронный исполнитель, были внесены изменения в расширение postgres_fdw. Подключаемый транспорт был реализован как набор интерфейсных функций, входящих в состав расширения Shardman. Во время выполнения обработчиков, которые взаимодействуют с удалёнными хостами, эти функции вызываются расширением postgres_fdw. Подключаемый транспорт Silk активируется, если предварительно загружено расширение Shardman и у стороннего сервера есть атрибут extended_features (применим для любого сервера FDW в кластере Shardman). В остальных случаях расширение postgres_fdw использует стандартный протокол обмена на основе расширения libpq.

Чтобы деактивировать подключаемый транспорт Silk в кластере Shardman, необходимо установить для параметра конфигурации query_engine_mode значение ENGINE_NONE.

В текущей реализации подключаемый транспорт Silk используется исключительно в запросах только для чтения, в частности во время выполнения планов узла ForeignScan. Для модификации запросов используется стандартный протокол обмена на основе расширения libpq.

При получении строк результатов выполнения запроса с использованием транспорта Silk данные сохраняются в хранилище TupleStoreState в виде полного набора результатов, который имеет тот же размер, что и возвращаемый удалённым узлом. Структура данных хранилища TupleStoreState позволяет передавать данные на диск в случае нехватки памяти. Если удалённый узел возвращает большой набор результатов, это не приводит к нехватке памяти (OOM). Как только набор результатов получен в TupleStoreState, данные копируются в буфер исполнителя ForeignScan в памяти. Размер этого буфера определяется атрибутом fetch_size стороннего сервера. Значение по умолчанию (50000 строк) можно изменить, чтобы найти баланс между производительностью (количество вызовов узла ForeignScan) и потреблением памяти.

Использование подключаемого транспорта Silk для асинхронного ядра FDW приводит к повышению производительности сетевого обмена и снижению потребления системных ресурсов за счёт их лучшего использования, включая количество сетевых подключений.