47.6. Plugins de salida de decodificación lógica #

47.6.1. Función de inicialización
47.6.2. Capacidades
47.6.3. Modos de salida
47.6.4. Devoluciones de llamada del plugin de salida
47.6.5. Funciones para generar salida

Se puede encontrar un plugin de salida de ejemplo en el subdirectorio contrib/test_decoding del árbol de fuentes de PostgreSQL.

47.6.1. Función de inicialización #

Un plugin de salida se carga cargando dinámicamente una biblioteca compartida con el nombre del plugin de salida como el nombre base de la biblioteca. Se utiliza la ruta de búsqueda de biblioteca normal para localizar la biblioteca. Para proporcionar las devoluciones de llamada (callbacks) del plugin de salida requeridas e indicar que la biblioteca es en realidad un plugin de salida, debe proporcionar una función llamada _PG_output_plugin_init. A esta función se le pasa una estructura que debe rellenarse con los punteros de función de devolución de llamada para las acciones individuales.

typedef struct OutputPluginCallbacks
{
    LogicalDecodeStartupCB startup_cb;
    LogicalDecodeBeginCB begin_cb;
    LogicalDecodeChangeCB change_cb;
    LogicalDecodeTruncateCB truncate_cb;
    LogicalDecodeCommitCB commit_cb;
    LogicalDecodeMessageCB message_cb;
    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
    LogicalDecodeShutdownCB shutdown_cb;
    LogicalDecodeFilterPrepareCB filter_prepare_cb;
    LogicalDecodeBeginPrepareCB begin_prepare_cb;
    LogicalDecodePrepareCB prepare_cb;
    LogicalDecodeCommitPreparedCB commit_prepared_cb;
    LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
    LogicalDecodeStreamStartCB stream_start_cb;
    LogicalDecodeStreamStopCB stream_stop_cb;
    LogicalDecodeStreamAbortCB stream_abort_cb;
    LogicalDecodeStreamPrepareCB stream_prepare_cb;
    LogicalDecodeStreamCommitCB stream_commit_cb;
    LogicalDecodeStreamChangeCB stream_change_cb;
    LogicalDecodeStreamMessageCB stream_message_cb;
    LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;

typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);

Las devoluciones de llamada begin_cb, change_cb y commit_cb son obligatorias, mientras que startup_cb, truncate_cb, message_cb, filter_by_origin_cb y shutdown_cb son opcionales. Si truncate_cb no está configurada pero se va a decodificar un TRUNCATE, la acción se ignorará.

Un plugin de salida también puede definir funciones para admitir la transmisión de transacciones grandes en curso. Las funciones stream_start_cb, stream_stop_cb, stream_abort_cb, stream_commit_cb y stream_change_cb son obligatorias, mientras que stream_message_cb y stream_truncate_cb son opcionales. La función stream_prepare_cb también es obligatoria si el plugin de salida también admite confirmaciones en dos fases.

Un plugin de salida también puede definir funciones para admitir confirmaciones en dos fases, lo que permite decodificar acciones en el PREPARE TRANSACTION. Las devoluciones de llamada begin_prepare_cb, prepare_cb, commit_prepared_cb y rollback_prepared_cb son obligatorias, mientras que filter_prepare_cb es opcional. La función stream_prepare_cb también es obligatoria si el plugin de salida también admite la transmisión de transacciones grandes en curso.

47.6.2. Capacidades #

Para decodificar, formatear y generar cambios, los plugins de salida pueden utilizar la mayor parte de la infraestructura normal del backend, incluida la llamada a funciones de salida. Se permite el acceso de solo lectura a las relaciones siempre que solo se acceda a las relaciones que hayan sido creadas por initdb en el esquema pg_catalog, o que se hayan marcado como tablas de catálogo proporcionadas por el usuario utilizando

ALTER TABLE user_catalog_table SET (user_catalog_table = true);
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);

Ten en cuenta que el acceso a las tablas del catálogo del usuario o a las tablas normales del catálogo del sistema en los plugins de salida debe realizarse únicamente a través de las APIs de escaneo systable_*. El acceso a través de las APIs de escaneo heap_* generará un error. Además, está prohibida cualquier acción que conduzca a la asignación de un ID de transacción. Eso, entre otras cosas, incluye escribir en tablas, realizar cambios DDL y llamar a pg_current_xact_id().

47.6.3. Modos de salida #

Las devoluciones de llamada del plugin de salida pueden pasar datos al consumidor en formatos casi arbitrarios. Para algunos casos de uso, como visualizar los cambios a través de SQL, devolver datos en un tipo de datos que pueda contener datos arbitrarios (por ejemplo, bytea) resulta engorroso. Si el plugin de salida solo genera datos textuales en la codificación del servidor, puede declararlo estableciendo OutputPluginOptions.output_type a OUTPUT_PLUGIN_TEXTUAL_OUTPUT en lugar de OUTPUT_PLUGIN_BINARY_OUTPUT en la devolución de llamada de inicio. En ese caso, todos los datos deben estar en la codificación del servidor para que un dato de tipo text pueda contenerlos. Esto se comprueba en las compilaciones con aserciones habilitadas.

47.6.4. Devoluciones de llamada del plugin de salida #

Un plugin de salida recibe notificaciones sobre los cambios que están ocurriendo a través de varias devoluciones de llamada que debe proporcionar.

Las transacciones concurrentes se decodifican en el orden de confirmación, y solo los cambios pertenecientes a una transacción específica se decodifican entre las devoluciones de llamada begin y commit. Las transacciones que se revirtieron explícita o implícitamente nunca se decodifican. Los puntos de salvaguarda (savepoints) exitosos se integran en la transacción que los contiene en el orden en que se ejecutaron dentro de esa transacción. Una transacción que se prepara para una confirmación en dos fases utilizando PREPARE TRANSACTION también se decodificará si se proporcionan las devoluciones de llamada del plugin de salida necesarias para decodificarlas. Es posible que la transacción preparada actual que se está decodificando se aborte concurrentemente a través de un comando ROLLBACK PREPARED. En ese caso, la decodificación lógica de esta transacción también se abortará. Todos los cambios de dicha transacción se omiten una vez que se detecta el aborto y se invoca la devolución de llamada prepare_cb. De este modo, incluso en caso de un aborto concurrente, se proporciona suficiente información al plugin de salida para que pueda manejar adecuadamente el ROLLBACK PREPARED una vez que se decodifique.

Note

Solo se decodificarán las transacciones que ya se hayan purgado de forma segura en el disco. Eso puede hacer que un COMMIT no se decodifique inmediatamente en una llamada a pg_logical_slot_get_changes() directamente posterior cuando synchronous_commit está establecido en off.

47.6.4.1. Devolución de llamada de inicio #

La devolución de llamada opcional startup_cb se invoca cada vez que se crea una ranura de replicación o se solicita que transmita cambios, independientemente del número de cambios que estén listos para ser generados.

typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
                                        OutputPluginOptions *options,
                                        bool is_init);

El parámetro is_init será verdadero cuando se esté creando la ranura de replicación y falso en caso contrario. options apunta a una estructura de opciones que los plugins de salida pueden configurar:

typedef struct OutputPluginOptions
{
    OutputPluginOutputType output_type;
    bool        receive_rewrites;
} OutputPluginOptions;

output_type debe establecerse a OUTPUT_PLUGIN_TEXTUAL_OUTPUT o a OUTPUT_PLUGIN_BINARY_OUTPUT. Consulta también la Section 47.6.3. Si receive_rewrites es verdadero, también se llamará al plugin de salida para los cambios realizados por las reescrituras de la pila (heap rewrites) durante ciertas operaciones DDL. Estos son de interés para los plugins que manejan la replicación de DDL, pero requieren un manejo especial.

La devolución de llamada de inicio debe validar las opciones presentes en ctx->output_plugin_options. Si el plugin de salida necesita tener un estado, puede utilizar ctx->output_plugin_private para almacenarlo.

47.6.4.2. Devolución de llamada de apagado #

La devolución de llamada opcional shutdown_cb se invoca siempre que una ranura de replicación previamente activa ya no se utilice y se puede emplear para desasignar recursos privados del plugin de salida. La ranura no se está eliminando necesariamente, simplemente se está deteniendo la transmisión.

typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);

47.6.4.3. Devolución de llamada de inicio de transacción #

La devolución de llamada obligatoria begin_cb se invoca cada vez que se ha decodificado el inicio de una transacción confirmada. Las transacciones abortadas y su contenido nunca se decodifican.

typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
                                      ReorderBufferTXN *txn);

El parámetro txn contiene metainformación sobre la transacción, como la marca de tiempo en la que se confirmó y su XID.

47.6.4.4. Devolución de llamada de finalización de transacción #

La devolución de llamada obligatoria commit_cb se invoca siempre que se ha decodificado la confirmación de una transacción. Las devoluciones de llamada change_cb para todas las filas modificadas se habrán llamado antes de esto, si ha habido filas modificadas.

typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       XLogRecPtr commit_lsn);

47.6.4.5. Devolución de llamada de cambio #

La devolución de llamada obligatoria change_cb se invoca para cada modificación individual de fila dentro de una transacción, ya sea un INSERT, UPDATE o DELETE. Incluso si el comando original modificó varias filas a la vez, la devolución de llamada se invocará individualmente para cada fila. La devolución de llamada change_cb puede acceder a las tablas del catálogo del sistema o del usuario para ayudar en el proceso de salida de los detalles de la modificación de la fila. En caso de decodificar una transacción preparada (pero aún no confirmada) o decodificar una transacción no confirmada, esta devolución de llamada de cambio también podría fallar debido a la reversión simultánea de esta misma transacción. En ese caso, la decodificación lógica de esta transacción abortada se detiene de forma controlada.

typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       Relation relation,
                                       ReorderBufferChange *change);

Los parámetros ctx y txn tienen el mismo contenido que para las devoluciones de llamada begin_cb y commit_cb, pero además se pasa el descriptor de relación relation que apunta a la relación a la que pertenece la fila y una estructura change que describe la modificación de la fila.

Note

Utilizando la decodificación lógica solo se pueden extraer los cambios en las tablas definidas por el usuario que no sean no registradas (unlogged) (consulta la UNLOGGED) y no temporales (consulta la TEMPORARY o TEMP).

47.6.4.6. Devolución de llamada de truncamiento #

La devolución de llamada opcional truncate_cb se invoca para un comando TRUNCATE.

typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
                                         ReorderBufferTXN *txn,
                                         int nrelations,
                                         Relation relations[],
                                         ReorderBufferChange *change);

Los parámetros son análogos a los de la devolución de llamada change_cb. Sin embargo, debido a que las acciones TRUNCATE en las tablas conectadas por claves foráneas deben ejecutarse juntas, esta devolución de llamada recibe un array de relaciones en lugar de una sola. Consulta la descripción de la sentencia TRUNCATE para obtener detalles.

47.6.4.7. Devolución de llamada de filtro de origen #

La devolución de llamada opcional filter_by_origin_cb se invoca para determinar si los datos que se han reproducido desde origin_id son de interés para el plugin de salida.

typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
                                               RepOriginId origin_id);

El parámetro ctx tiene el mismo contenido que para las demás devoluciones de llamada. No hay más información disponible que el origen. Para indicar que los cambios originados en el nodo pasado son irrelevantes, devuelve verdadero, lo que hará que se filtren; de lo contrario, devuelve falso. Las otras devoluciones de llamada no se invocarán para las transacciones y cambios que se hayan filtrado.

Esto es útil cuando se implementan soluciones de replicación en cascada o multidireccionales. El filtrado por origen permite evitar la replicación de los mismos cambios de ida y vuelta en tales configuraciones. Aunque las transacciones y los cambios también llevan información sobre el origen, el filtrado a través de esta devolución de llamada es notablemente más eficiente.

47.6.4.8. Devolución de llamada de mensaje genérico #

La devolución de llamada opcional message_cb se invoca cada vez que se ha decodificado un mensaje de decodificación lógica.

typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr message_lsn,
                                        bool transactional,
                                        const char *prefix,
                                        Size message_size,
                                        const char *message);

El parámetro txn contiene metainformación sobre la transacción, como la marca de tiempo en la que se confirmó y su XID. Ten en cuenta, sin embargo, que puede ser NULL cuando el mensaje no es transaccional y el XID aún no se había asignado en la transacción que registró el mensaje. El parámetro lsn tiene la ubicación WAL del mensaje. El parámetro transactional indica si el mensaje se envió como transaccional o no. Al igual que la devolución de llamada de cambio, en caso de decodificar una transacción preparada (pero aún no confirmada) o decodificar una transacción no confirmada, esta devolución de llamada de mensaje también podría fallar debido a la reversión simultánea de esta misma transacción. En ese caso, la decodificación lógica de esta transacción abortada se detiene de forma controlada. El parámetro prefix es un prefijo arbitrario terminado en nulo que se puede utilizar para identificar mensajes interesantes para el plugin actual. Y finalmente, el parámetro message contiene el mensaje real de tamaño message_size.

Se debe tener especial cuidado para asegurar que el prefijo que el plugin de salida considera interesante sea único. El uso del nombre de la extensión o del propio plugin de salida suele ser una buena opción.

47.6.4.9. Devolución de llamada de filtro de preparación #

La devolución de llamada opcional filter_prepare_cb se invoca para determinar si los datos que forman parte de la transacción actual de confirmación en dos fases deben considerarse para decodificarse en esta etapa de preparación o más adelante como una transacción normal de una fase en el momento del COMMIT PREPARED. Para indicar que la decodificación debe omitirse, devuelve true; de lo contrario, devuelve false. Cuando la devolución de llamada no está definida, se asume false (es decir, sin filtrado, todas las transacciones que utilizan la confirmación en dos fases se decodifican también en dos fases).

typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
                                              TransactionId xid,
                                              const char *gid);

El parámetro ctx tiene el mismo contenido que para las demás devoluciones de llamada. Los parámetros xid y gid proporcionan dos formas diferentes de identificar la transacción. El posterior COMMIT PREPARED o ROLLBACK PREPARED lleva ambos identificadores, lo que proporciona al plugin de salida la opción de elegir cuál utilizar.

La devolución de llamada se puede invocar varias veces por transacción para decodificar y debe proporcionar la misma respuesta estática para un par determinado de xid y gid cada vez que se invoque.

47.6.4.10. Devolución de llamada de inicio de preparación de transacción #

La devolución de llamada obligatoria begin_prepare_cb se invoca cada vez que se ha decodificado el inicio de una transacción preparada. El campo gid, que forma parte del parámetro txn, se puede utilizar en esta devolución de llamada para comprobar si el plugin ya ha recibido este PREPARE, en cuyo caso puede generar un error o ignorar los cambios restantes de la transacción.

typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn);

47.6.4.11. Devolución de llamada de preparación de transacción #

La devolución de llamada obligatoria prepare_cb se invoca siempre que se ha decodificado una transacción preparada para la confirmación en dos fases. La devolución de llamada change_cb para todas las filas modificadas se habrá llamado antes de esto, si ha habido filas modificadas. El campo gid, que forma parte del parámetro txn, se puede utilizar en esta devolución de llamada.

typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr prepare_lsn);

47.6.4.12. Devolución de llamada de transacción preparada confirmada #

La devolución de llamada obligatoria commit_prepared_cb se invoca siempre que se ha decodificado un COMMIT PREPARED de transacción. El campo gid, que forma parte del parámetro txn, se puede utilizar en esta devolución de llamada.

typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
                                               ReorderBufferTXN *txn,
                                               XLogRecPtr commit_lsn);

47.6.4.13. Devolución de llamada de transacción preparada revertida #

La devolución de llamada obligatoria rollback_prepared_cb se invoca siempre que se ha decodificado un ROLLBACK PREPARED de transacción. El campo gid, que forma parte del parámetro txn, se puede utilizar en esta devolución de llamada. Los parámetros prepare_end_lsn y prepare_time se pueden utilizar para comprobar si el plugin ha recibido este PREPARE TRANSACTION, en cuyo caso puede aplicar la reversión; de lo contrario, puede omitir la operación de reversión. El gid por sí solo no es suficiente porque el nodo de destino puede tener una transacción preparada con el mismo identificador.

typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
                                                 ReorderBufferTXN *txn,
                                                 XLogRecPtr prepare_end_lsn,
                                                 TimestampTz prepare_time);

47.6.4.14. Devolución de llamada de inicio de transmisión #

La devolución de llamada obligatoria stream_start_cb se invoca al abrir un bloque de cambios transmitidos de una transacción en curso.

typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
                                            ReorderBufferTXN *txn);

47.6.4.15. Devolución de llamada de parada de transmisión #

La devolución de llamada obligatoria stream_stop_cb se invoca al cerrar un bloque de cambios transmitidos de una transacción en curso.

typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
                                           ReorderBufferTXN *txn);

47.6.4.16. Devolución de llamada de aborto de transmisión #

La devolución de llamada obligatoria stream_abort_cb se invoca para abortar una transacción previamente transmitida.

typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
                                            ReorderBufferTXN *txn,
                                            XLogRecPtr abort_lsn);

47.6.4.17. Devolución de llamada de preparación de transmisión #

La devolución de llamada stream_prepare_cb se invoca para preparar una transacción previamente transmitida como parte de una confirmación en dos fases. Esta devolución de llamada es obligatoria cuando el plugin de salida admite tanto la transmisión de transacciones grandes en curso como las confirmaciones en dos fases.

typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              XLogRecPtr prepare_lsn);

47.6.4.18. Devolución de llamada de confirmación de transmisión #

La devolución de llamada obligatoria stream_commit_cb se invoca para confirmar una transacción previamente transmitida.

typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn,
                                             XLogRecPtr commit_lsn);

47.6.4.19. Devolución de llamada de cambio de transmisión #

La devolución de llamada obligatoria stream_change_cb se invoca al enviar un cambio en un bloque de cambios transmitidos (delimitados por las llamadas a stream_start_cb y stream_stop_cb). Los cambios reales no se muestran ya que la transacción puede abortarse en un momento posterior y no decodificamos cambios para transacciones abortadas.

typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn,
                                             Relation relation,
                                             ReorderBufferChange *change);

47.6.4.20. Devolución de llamada de mensaje de transmisión #

La devolución de llamada opcional stream_message_cb se invoca al enviar un mensaje genérico en un bloque de cambios transmitidos (delimitados por las llamadas a stream_start_cb y stream_stop_cb). El contenido del mensaje para los mensajes transaccionales no se muestra ya que la transacción puede abortarse en un momento posterior y no decodificamos cambios para transacciones abortadas.

typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              XLogRecPtr message_lsn,
                                              bool transactional,
                                              const char *prefix,
                                              Size message_size,
                                              const char *message);

47.6.4.21. Devolución de llamada de truncamiento de transmisión #

La devolución de llamada opcional stream_truncate_cb se invoca para un comando TRUNCATE en un bloque de cambios transmitidos (delimitados por las llamadas a stream_start_cb y stream_stop_cb).

typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
                                               ReorderBufferTXN *txn,
                                               int nrelations,
                                               Relation relations[],
                                               ReorderBufferChange *change);

Los parámetros son análogos a los de la devolución de llamada stream_change_cb. Sin embargo, debido a que las acciones TRUNCATE en las tablas conectadas por claves foráneas deben ejecutarse juntas, esta devolución de llamada recibe un array de relaciones en lugar de una sola. Consulta la descripción de la sentencia TRUNCATE para obtener detalles.

47.6.5. Funciones para generar salida #

Para generar salida realmente, los plugins de salida pueden escribir datos en el búfer de salida StringInfo en ctx->out cuando están dentro de las devoluciones de llamada begin_cb, commit_cb o change_cb. Antes de escribir en el búfer de salida, se debe llamar a OutputPluginPrepareWrite(ctx, last_write), y después de terminar de escribir en el búfer, se debe llamar a OutputPluginWrite(ctx, last_write) para realizar la escritura. El parámetro last_write indica si una escritura en particular fue la última del plugin de devolución de llamada.

El siguiente ejemplo muestra cómo enviar datos al consumidor de un plugin de salida:

OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
OutputPluginWrite(ctx, true);