Claro!
Obrigado!
Um abraço!
wallace reis wrote:
On 12/18/06, Php developer
<brunophpdeveloper@...>
wrote:
Não deu certo amigão. Fica na
mesma!
Obrigado de qualquer maneira!
Se alguém tiver alguma outra sugestão, agradeço!!
Você pode enviar o código da função?
--
wallace reis
Núcleo de Biologia Computacional e
Gestão de Informações Biotecnológicas/LABBI
_______________________________________________
Grupo de Usuários do PostgreSQL no Brasil
Antes de perguntar consulte o manual
http://pgdocptbr.sourceforge.net/
Para editar suas opções ou sair da lista acesse a página da lista em:
http://pgfoundry.org/mailman/listinfo/brasil-usuarios
--
-- ATTENTION: This is a free software.
-- View the LICENSE.txt file for license information
--
------------------------------------------------------------------------------
-- TABLE: public.pg_dblinks
--
-- Table that will hold the DBLINK connection info
-- Only visible to the 'postgres' user
------------------------------------------------------------------------------
CREATE TABLE public.pg_dblinks (
dblinkid serial,
dblinkname text,
datasource text,
username text,
password text,
attributes text,
ctime timestamp DEFAULT now(),
CONSTRAINT pg_dblink_pk PRIMARY KEY (dblinkid)
);
CREATE UNIQUE INDEX pg_dblinks_uix ON public.pg_dblinks(dblinkname);
COMMENT ON TABLE public.pg_dblinks IS
$$
This table contains the necessary connection information for a DBI
connection.
$$;
------------------------------------------------------------------------------
-- TABLE: public.pg_snapshots
--
-- Table that will hold the SNAPSHOT info
------------------------------------------------------------------------------
CREATE TABLE public.pg_snapshots (
snapid serial,
schemaname text not null,
snapshotname text not null,
query text not null,
dblinkid bigint,
ctime timestamp default now(),
snaptime timestamp,
elapsedtime interval,
auto_time time,
auto_interval interval,
kind char(1) check (kind in ('F', 'C', 'R')) default 'R', --F=FAST C=COMPLETE R=FORCE
pbt_table text, -- ON PREBUILT TABLE (prebuilt table name)
CONSTRAINT pg_snapshots_pk primary key (snapid),
CONSTRAINT pg_snapshots_dblinks_fk foreign key (dblinkid) references public.pg_dblinks(dblinkid) ON UPDATE RESTRICT ON DELETE RESTRICT
);
CREATE UNIQUE INDEX pg_snapshots_uix ON public.pg_snapshots(schemaname, snapshotname);
CREATE INDEX pg_snapshots_1_ix ON public.pg_snapshots(schemaname, pbt_table);
GRANT select ON public.pg_snapshots to public;
COMMENT ON TABLE public.pg_snapshots IS
$$
This table contains the list of snapshots on your system a the
query/dblink used to create it and to fill it up.
$$;
------------------------------------------------------------------------------
-- TABLE: public.pg_mlogs
--
-- Table that will hold the SNAPSHOT LOG info
------------------------------------------------------------------------------
CREATE TABLE public.pg_mlogs (
snaplogid serial,
masterschema text NOT NULL,
mastername text NOT NULL,
flag numeric NOT NULL,
log text NOT NULL,
CONSTRAINT pg_mlogs_pk PRIMARY KEY (snaplogid)
);
CREATE UNIQUE INDEX pg_mlogs_uix ON public.pg_mlogs(masterschema, mastername);
GRANT select ON public.pg_mlogs to public;
COMMENT ON TABLE public.pg_mlogs IS
$$
This table contains the list of snapshot logs.
$$;
------------------------------------------------------------------------------
-- TABLE: public.pg_mlog_refcols
--
-- Table that will hold the SNAPSHOT LOG columns info
------------------------------------------------------------------------------
CREATE TABLE public.pg_mlog_refcols (
snaplogid numeric,
masterschema text NOT NULL,
mastername text NOT NULL,
colname text NOT NULL,
oldest timestamp NOT NULL DEFAULT now(),
flag numeric(1),
CONSTRAINT pg_mlog_refcols_pk PRIMARY KEY (snaplogid, colname),
CONSTRAINT pg_mlog_refcols_mlog_fk FOREIGN KEY (snaplogid) REFERENCES public.pg_mlogs(snaplogid) ON DELETE CASCADE
);
CREATE UNIQUE INDEX pg_mlog_refcols_uix ON public.pg_mlog_refcols(masterschema, mastername, colname);
GRANT select ON public.pg_mlog_refcols to public;
COMMENT ON TABLE public.pg_mlog_refcols IS
$$
This table contains the list of snapshot logs.
$$;
------------------------------------------------------------------------------
-- TABLE: public.pg_slogs
--
-- Table that will hold the SNAPSHOT LOG refresh info
------------------------------------------------------------------------------
CREATE TABLE public.pg_slogs (
snaplogid bigint,
snapid bigint,
snaptime timestamp not null,
userid int4 not null,
CONSTRAINT pg_slogs_pk primary key (snaplogid, snapid),
CONSTRAINT pg_slogs_mlog_fk FOREIGN KEY (snaplogid) REFERENCES public.pg_mlogs(snaplogid) ON DELETE CASCADE,
CONSTRAINT pg_slogs_snap_fk FOREIGN KEY (snapid) REFERENCES public.pg_snapshots(snapid) ON DELETE CASCADE
);
CREATE UNIQUE INDEX pg_slogs_ix_1 on public.pg_slogs(snapid, snaplogid);
CREATE UNIQUE INDEX pg_slogs_ix_2 on public.pg_slogs(snapid, snaptime, snaplogid);
CREATE UNIQUE INDEX pg_slogs_ix_3 on public.pg_slogs(snaplogid, snaptime, snapid);
GRANT select ON public.pg_slogs to public;
COMMENT ON TABLE public.pg_slogs IS
$$
This table contains the list of master tables that have snapshot logs and are referenced by snapshots elsewhere. This table, along with the master table log, allows fast refreshes of snapshots based on snapshot logs.
$$;
--
-- ATTENTION: This is a free software.
-- View the LICENSE.txt file for license information
--
------------------------------------------------------------------------------
-- FUNCTION: public.create_dblink
--
-- Creates a DATABASE LINK to any databases supported by PERL
------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION public.create_dblink(dblinkname text, datasource text, username text, password text, attributes text)
RETURNS bool AS
$BODY$
use strict;
use DBI;
use constant TRUE => 1;
use constant FALSE => "";
#-- Function parameters
my ($dblinkname, $datasource, $username, $password, $attributes) = @_;
$dblinkname = lc($dblinkname);
my $attr_href = eval($attributes);
#-- Set this to 1 for debugging messages
$main::DEBUG=0;
#-- Localhost superuser connection
my $dbh_local = DBI->connect('dbi:Pg:dbname=labpeTeste', 'postgres', undef, {AutoCommit => 0});
#-- Remote database connection
my $dbh;
#-- variables
my $sql;
my $sth;
#-- Test if DBLINK already exists
if (dblinkExists($dbh_local, $dblinkname)) {
elog ERROR, "DBLink $dblinkname already created";
}
#-- Try to connect to the remote database
$dbh = dbiGetConnection($datasource, $username, $password, $attr_href);
$dbh->disconnect;
#-- Create the DBLINK entry on public.pg_dblinks table
$sql = <<SQL;
INSERT INTO public.pg_dblinks(dblinkname, datasource, username, password, attributes)
VALUES (?, ?, ?, ?, ?)
SQL
$sth = $dbh_local->prepare($sql);
$sth->execute(($dblinkname, $datasource, $username, $password, $attributes));
if (! $sth->err) {
elog NOTICE, "DBLink created" if $main::DEBUG==1;
} else {
elog ERROR, "Could not create DBLink '$dblinkname' ERROR=". $sth->errstr;
}
$dbh_local->commit;
$dbh_local->disconnect;
#-- All done. Let's return TRUE
return TRUE;
#--
#-- SUB: sqlLookup
#-- Returns a one row result from a SQL query
#--
sub sqlLookup {
my ($dbh, $sql, @params) = @_;
return sqlLookup2 ($dbh, $sql, \@params, TRUE);
}
sub sqlLookupSoft {
my ($dbh, $sql, @params) = @_;
return sqlLookup2 ($dbh, $sql, \@params, FALSE);
}
sub sqlLookup2 {
my ($dbh, $sql, $params, $errorsAreFatal) = @_;
my $sth;
my @row;
elog NOTICE, "Lookup at SQL:$sql with ".((@$params ne 0) ? join(',', @$params) : '<none>') if $main::DEBUG==1;
$sth = $dbh->prepare($sql);
if (@$params ne 0) {
$sth->execute(@$params);
} else {
$sth->execute();
}
if ($sth->err) {
if ($errorsAreFatal) {
elog ERROR, "Fatal error! SQL=$sql ERROR=".$sth->errstr;
}
}
my @row = $sth->fetchrow_array;
$sth->finish;
return @row;
}
#--
#-- SUB: dblinkExists
#-- Returns whether a DBLINK exists or not
#--
sub dblinkExists {
my ($dbh, $dblinkname) = @_;
# local variables
my $sql;
my $sth;
my $row;
$sql = <<SQL;
SELECT count(*) as total
FROM public.pg_dblinks
WHERE dblinkname=?
SQL
$sth = $dbh->prepare($sql);
if ($dbh->err) {
elog ERROR, $dbh->errstr;
}
$sth->execute(($dblinkname));
if ($sth->err) {
elog ERROR, "Could not find public.pg_dblink! ";
}
$row = $sth->fetchrow_hashref;
return ($row->{'total'} ne 0);
}
#--
#-- SUB: dbiGetConnection
#-- Connects to a DBI datasource and returns a connection handle
#--
sub dbiGetConnection {
my ($datasource, $username, $password, $attributes) = @_;
# local variables
my $dbh;
$dbh = DBI->connect(
$datasource
, $username
, $password
, $attributes
);
if ($DBI::errstr) {
elog ERROR, <<ERR;
Could not connect to database
data source: $datasource
user: $username
password: $password
dbh attributes:
$attributes
$DBI::errstr
ERR
}
return $dbh;
}
$BODY$
LANGUAGE 'plperlu' VOLATILE;
ALTER FUNCTION public.create_dblink(dblinkname text, datasource text, username text, password text, attributes text) OWNER TO postgres;
COMMENT ON FUNCTION public.create_dblink(dblinkname text, datasource text, username text, password text, attributes text) IS $$
This function is part of PostgreSQL::Snapshots project.
This is the function that creates a dblink. First it test the connection then it adds a record on the public.pg_dblinks table.
$$;
------------------------------------------------------------------------------
-- FUNCTION: public.drop_dblink
--
-- Removes a previously created DATABASE LINK
------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION public.drop_dblink(dblinkname text)
RETURNS bool AS
$BODY$
use strict;
use DBI;
use constant TRUE => 1;
use constant FALSE => "";
#-- Function parameters
my ($dblinkname) = @_;
$dblinkname = lc($dblinkname);
#-- Set this to 1 for debugging messages
$main::DEBUG=0;
#-- Localhost superuser connection
my $dbh_local = DBI->connect('dbi:Pg:dbname=labpeTeste', 'postgres', undef, {AutoCommit => 0});
#-- variables
my $sql;
my $sth;
#-- Lets make shure that the DBLINK exists
if (! dblinkExists($dbh_local, $dblinkname)) {
elog ERROR, "DBLink '$dblinkname' does not exist";
}
#-- Delete the DBLINK entry
$sql = <<SQL;
DELETE FROM public.pg_dblinks WHERE dblinkname=?
SQL
$sth = $dbh_local->prepare($sql);
$sth->execute(($dblinkname));
if (! $sth->err) {
elog NOTICE, "DBLink removed" if $main::DEBUG==1;
} else {
elog ERROR, "Could not remove DBLink '$dblinkname' ERROR=".$sth->errstr;
}
$dbh_local->commit;
$dbh_local->disconnect;
#-- All done. Let's return TRUE
return TRUE;
#--
#-- SUB: dblinkExists
#-- Returns whether a DBLINK exists or not
#--
sub dblinkExists {
my ($dbh, $dblinkname) = @_;
# local variables
my $sql;
my $sth;
my $row;
$sql = <<SQL;
SELECT count(*) as total
FROM public.pg_dblinks
WHERE dblinkname=?
SQL
$sth = $dbh->prepare($sql);
if ($dbh->err) {
elog ERROR, $dbh->errstr;
}
$sth->execute(($dblinkname));
if ($sth->err) {
elog ERROR, "Could not find public.pg_dblink! ";
}
$row = $sth->fetchrow_hashref;
return ($row->{'total'} ne 0);
}
#--
#-- SUB: sqlLookup
#-- Returns a one row result from a SQL query
#--
sub sqlLookup {
my ($dbh, $sql, @params) = @_;
return sqlLookup2 ($dbh, $sql, \@params, TRUE);
}
sub sqlLookupSoft {
my ($dbh, $sql, @params) = @_;
return sqlLookup2 ($dbh, $sql, \@params, FALSE);
}
sub sqlLookup2 {
my ($dbh, $sql, $params, $errorsAreFatal) = @_;
my $sth;
my @row;
elog NOTICE, "Lookup at SQL:$sql with ".((@$params ne 0) ? join(',', @$params) : '<none>') if $main::DEBUG==1;
$sth = $dbh->prepare($sql);
if (@$params ne 0) {
$sth->execute(@$params);
} else {
$sth->execute();
}
if ($sth->err) {
if ($errorsAreFatal) {
elog ERROR, "Fatal error! SQL=$sql ERROR=".$sth->errstr;
}
}
my @row = $sth->fetchrow_array;
$sth->finish;
return @row;
}
$BODY$
LANGUAGE 'plperlu' VOLATILE;
ALTER FUNCTION public.drop_dblink(dblinkname text) OWNER TO postgres;
COMMENT ON FUNCTION public.drop_dblink(dblinkname text) IS $$
This function is part of PostgreSQL::Snapshots project.
This is the function that removes a dblink.
$$;
--
-- ATTENTION: This is a free software.
-- View the LICENSE.txt file for license information
--
------------------------------------------------------------------------------
-- FUNCTION: public.create_snapshot
--
-- Creates a SNAPSHOT based on a LOCAL query or on a DBLINK query
-- TODO: ON PREBUILT TABLE -> may be child OO table ?
------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION public.create_snapshot(schemaname text, snapshotname text, query text, dblink text, kind text, pbt_table text)
RETURNS bool AS
$BODY$
use strict;
use DBI;
use constant TRUE => 1;
use constant FALSE => "";
#-- Function parameters
my ($schemaname, $snapshotname, $query, $dblinkname, $kindname, $pbt_table) = @_;
$schemaname = lc($schemaname);
$snapshotname = lc($snapshotname);
$dblinkname = lc($dblinkname);
$kindname = lc($kindname);
$pbt_table = lc($pbt_table);
#-- Set this to 1 for debugging messages
$main::DEBUG=0;
#-- Remote database connection
my $dbh;
#-- Localhost superuser connection
my $dbh_local = DBI->connect('dbi:Pg:dbname=labpeTeste', 'postgres', undef, {AutoCommit => 0});
my $sql = '';
my $sth;
my $row;
my $kind;
my $pbt;
if ($kindname eq 'fast') {
$kind = 'F';
} elsif ($kindname eq 'complete') {
$kind = 'C';
} elsif ($kindname eq 'force') {
$kind = 'R';
} else {
elog ERROR, "KIND of refresh should be 'FAST', 'COMPLETE' or 'FORCE' !";
}
if ("$pbt_table" eq '') {
$pbt = '';
} else {
$pbt = $pbt_table;
}
#-- Test if the SNAPSHOT entry already exists
if (snapshotExists($dbh_local, $schemaname, $snapshotname)) {
elog ERROR, "Snapshot '$snapshotname' already created";
}
if ($pbt eq 'N') {
#-- Test if another OBJECT exists on the same place
if (objectExists($dbh_local, $schemaname, $snapshotname)) {
elog ERROR, "An object at '$schemaname.$snapshotname' already exists";
}
} else {
#-- Test if the PREBUILT TABLE exists
if (! objectExists($dbh_local, $schemaname, $pbt_table)) {
elog ERROR, "Prebuilt table '$schemaname.$pbt_table' does not exist!";
}
}
if ("$dblinkname" eq '') {
#-- create local
elog NOTICE, 'Creating LOCAL snapshot' if $main::DEBUG==1;
createLocalSnapshot($dbh_local, $schemaname, $snapshotname, $query, $pbt);
} else {
#-- create remote
elog NOTICE, 'Creating REMOTE(DBLINK) snapshot' if $main::DEBUG==1;
createRemoteSnapshot($dbh_local, $schemaname, $snapshotname, $query, $dblinkname, $pbt);
}
#-- get the DBLINK id
my $dblinkid = getDblinkId($dbh_local, $dblinkname);
#-- insert the snapshot into the catalog
$sql = <<SQL;
INSERT INTO public.pg_snapshots(schemaname, snapshotname, query, dblinkid, kind, pbt_table)
VALUES (?, ?, ?, ?, ?, ?)
SQL
$sth = $dbh_local->prepare($sql);
$sth->execute(($schemaname, $snapshotname, $query, $dblinkid, $kind, $pbt));
if (! $sth->err) {
elog NOTICE, "Snapshot entry created" if $main::DEBUG==1;
} else {
elog ERROR, "Could not create snapshot entry. SQL=$sql ERROR=".$sth->errstr;
}
$dbh_local->commit;
$dbh_local->disconnect;
#-- All done. Let's return TRUE
return TRUE;
#--
#-- SUB: sqlLookup
#-- Returns a one row result from a SQL query
#--
sub sqlLookup {
my ($dbh, $sql, @params) = @_;
return sqlLookup2 ($dbh, $sql, \@params, TRUE);
}
sub sqlLookupSoft {
my ($dbh, $sql, @params) = @_;
return sqlLookup2 ($dbh, $sql, \@params, FALSE);
}
sub sqlLookup2 {
my ($dbh, $sql, $params, $errorsAreFatal) = @_;
my $sth;
my @row;
elog NOTICE, "Lookup at SQL:$sql with ".((@$params ne 0) ? join(',', @$params) : '<none>') if $main::DEBUG==1;
$sth = $dbh->prepare($sql);
if (@$params ne 0) {
$sth->execute(@$params);
} else {
$sth->execute();
}
if ($sth->err) {
if ($errorsAreFatal) {
elog ERROR, "Fatal error! SQL=$sql ERROR=".$sth->errstr;
}
}
my @row = $sth->fetchrow_array;
$sth->finish;
return @row;
}
#--
#-- SUB: snapshotExists
#-- Returns whether a snapshot exists or not
#--
sub snapshotExists {
#-- Function parameters
my ($dbh, $schemaname, $snapshotname) = @_;
# local variables
my $sql;
my $sth;
my $row;
$sql = <<SQL;
SELECT count(*) as total
FROM public.pg_snapshots
WHERE schemaname=?
and snapshotname=?
SQL
$sth = $dbh->prepare($sql);
$sth->execute(($schemaname, $snapshotname));
if ($sth->err) {
elog ERROR, "Could not find public.pg_snapshots !!!";
}
$row = $sth->fetchrow_hashref;
return ($row->{'total'} ne 0);
}
#--
#-- SUB: objectExists
#-- Returns whether an object exists or not
#--
sub objectExists {
my ($dbh, $schemaname, $snapshotname) = @_;
# local variables
my $sql;
my $sth;
my $row;
$sql = <<SQL;
SELECT count(*) as total
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname=? AND c.relname = ?
SQL
$sth = $dbh->prepare($sql);
$sth->execute(($schemaname, $snapshotname));
$row = $sth->fetchrow_hashref;
return ($row->{total} ne 0);
}
#--
#-- SUB: getDblinkId
#-- Maps a DBLINK name to a DBLINK id
#--
sub getDblinkId {
#-- Function parameters
my ($dbh, $dblinkname) = @_;
#-- Local variables
my $sql;
my $dblink;
my $sth;
if ("$dblinkname" eq '') {
return undef;
}
#--Get DBLINK Id
$sql = 'SELECT dblinkid FROM public.pg_dblinks WHERE dblinkname=?';
$sth = $dbh->prepare($sql);
$sth->execute(($dblinkname));
#-- Fetch the dblink info
$dblink = $sth->fetchrow_hashref;
return $dblink->{'dblinkid'};
}
#--
#-- SUB: createLocalSnapshot
#-- Creates a snapshot based on a local database query
#--
sub createLocalSnapshot {
#-- Function parameters
my ($dbh, $schemaname, $snapshotname, $query, $pbt) = @_;
#-- Local variables
my $sql;
my $sth;
if ($pbt eq 'N') {
#-- Creates an empty table based on the query
$sql = <<SQL;
CREATE TABLE $schemaname.$snapshotname AS
SELECT query.*
FROM ($query) query
WHERE 1=0
SQL
$sth = spi_exec_query($sql);
if ($sth->{status} eq 'SPI_OK_SELINTO') {
elog NOTICE, "Snapshot placeholder created" if $main::DEBUG==1;
} else {
elog ERROR, "Could not create snapshot placeholder: $sql [".$sth->status."]";
}
} else {
my $snapshotname = $pbt;
#-- execute the query WHERE 1=0
$sql = <<SQL;
SELECT query.*
FROM ($query) query
WHERE 1=0
SQL
$sth = $dbh->prepare($sql);
$sth->execute;
if ($sth->err) {
elog ERROR, "Could not execute query: $sql [$sth->{errstr}]";
}
my $sth1;
my $columns;
my @cols = ();
for ( my $i = 0 ; $i < $sth->{NUM_OF_FIELDS} ; $i++ ) {
push @cols, $sth->{NAME}->[$i];
}
$columns = join(',', @cols);
#-- execute a query WHERE 1=0 on the existant table
$sql = <<SQL;
SELECT $columns
FROM $schemaname.$snapshotname
WHERE 1=0
SQL
$sth1 = $dbh->prepare($sql);
$sth1->execute;
if ($sth1->err) {
elog ERROR, "Could not execute query: $sql [$sth1->{errstr}]";
}
validateModifyPrebuiltTable($dbh, $schemaname, $snapshotname, $sth, $sth1);
$sth1->finish();
$dbh->commit;
}
$sth->finish();
}
#--
#-- SUB: createRemoteSnapshot
#-- Creates a snapshot based on a remote database query
#--
sub createRemoteSnapshot {
#-- Function parameters
my ($dbh, $schemaname, $snapshotname, $query, $dblinkname, $pbt) = @_;
#-- Local variables
my $dbh_remote;
my $sql;
my $sth;
my $dblink;
my $attr_href;
#--Test if the DBLINK entry exists
$sql = 'SELECT * FROM public.pg_dblinks WHERE dblinkname=?';
$sth = $dbh->prepare($sql);
$sth->execute(($dblinkname));
if ($sth->err) {
elog ERROR, "Table 'public.pg_dblinks' does not exist!";
return FALSE;
}
#-- Fetch the dblink info
$dblink = $sth->fetchrow_hashref;
if (! $dblink->{'dblinkname'}) {
elog ERROR, "DBLink '$dblinkname' does not exist!";
return FALSE;
}
#-- Convert the attributes to a hashtable
$attr_href = eval($dblink->{'attributes'});
#-- Make the remote database connection
$dbh_remote = dbiGetConnection(
$dblink->{'datasource'}
, $dblink->{'username'}
, $dblink->{'password'}
, $attr_href
);
#-- holds each column definition of the new table
my @cols;
#-- retrieve the structure of the snapshot query (empty query)
$sql = getQueryWithNoRecords($dbh_remote, $query);
$sth = $dbh_remote->prepare($sql);
$sth->execute;
if ($pbt eq 'N') {
#-- Loop through all columns and retrieves the name and type of each one
#-- Also maps the SQL Perl types to PostgreSQL ones
#-- Place this info on @cols
for ( my $i = 0 ; $i < $sth->{NUM_OF_FIELDS} ; $i++ ) {
my $line = $sth->{NAME}->[$i];
my $datatype = getUnifiedSqlType($sth->{TYPE}->[$i]);
my $precision = $sth->{PRECISION}->[$i];
my $scale = $sth->{SCALE}->[$i];
my $isnullable = $sth->{NULLABLE}->[$i];
my $nullable = '';
if (! $isnullable) {
$nullable = 'NOT NULL';
}
if ( $datatype == DBI::SQL_INTEGER ) {
$line .= " INTEGER $nullable";
} elsif ( $datatype == DBI::SQL_SMALLINT ) {
$line .= " SMALLINT $nullable";
} elsif ( $datatype == DBI::SQL_NUMERIC ) {
$line .= " NUMERIC($precision, $scale) $nullable";
} elsif ( $datatype == DBI::SQL_BOOLEAN ) {
$line .= " BOOLEAN $nullable";
} elsif ( $datatype == DBI::SQL_CHAR ) {
$line .= " CHAR($precision) $nullable";
} elsif ( $datatype == DBI::SQL_VARCHAR ) {
$line .= " VARCHAR($precision) $nullable";
} elsif ( $datatype == DBI::SQL_CLOB ) {
$line .= " VARCHAR $nullable";
} elsif ( $datatype == DBI::SQL_DATE ) {
$line .= " DATE $nullable";
} elsif ( $datatype == DBI::SQL_TIME ) {
$line .= " TIME $nullable";
} elsif ( $datatype == DBI::SQL_TIMESTAMP ) {
$line .= " TIMESTAMP $nullable";
} elsif ( $datatype == DBI::SQL_INTERVAL ) {
$line .= " INTERVAL $nullable";
} elsif ( $datatype == DBI::SQL_BLOB ) {
$line .= " BYTEA $nullable";
} else { #--UNKNOWN
elog WARNING, "Unknown type '$datatype' mapped to 'TEXT'!";
$line .= " TEXT $nullable";
}
push @cols, $line;
}
$sth->finish;
#-- Makes the CREATE TABLE SQL statement
$sql = <<SQL;
CREATE TABLE $schemaname.$snapshotname (
@{[ join("\n, ", @cols) ]}
)
SQL
elog NOTICE, $sql if $main::DEBUG==1;
$sth = spi_exec_query($sql);
if ($sth->{status} eq 'SPI_OK_UTILITY') {
elog NOTICE, "Snapshot placeholder created" if $main::DEBUG==1;
} else {
elog ERROR, "Could not create snapshot placeholder: $sql [$sth->{status}]";
}
} else {
my $snapshotname = $pbt;
my $sth1;
my $columns;
for ( my $i = 0 ; $i < $sth->{NUM_OF_FIELDS} ; $i++ ) {
push @cols, $sth->{NAME}->[$i];
}
$columns = join(',', @cols);
#-- execute a query WHERE 1=0 on the existant table
$sql = <<SQL;
SELECT $columns
FROM $schemaname.$snapshotname
WHERE 1=0
SQL
$sth1 = $dbh->prepare($sql);
$sth1->execute;
if ($sth1->err) {
elog ERROR, "Could not execute query: $sql [$sth1->{status}]";
}
validateModifyPrebuiltTable($dbh, $schemaname, $snapshotname, $sth, $sth1);
$sth1->finish();
$dbh->commit;
}
$sth->finish();
#-- Disconnects from the remote database
$dbh_remote->disconnect;
}
#-- Dependencies
#--
#-- SUB: dbiGetConnection
#-- Connects to a DBI datasource and returns a connection handle
#--
sub dbiGetConnection {
my ($datasource, $username, $password, $attributes) = @_;
# local variables
my $dbh;
$dbh = DBI->connect(
$datasource
, $username
, $password
, $attributes
);
if ($DBI::errstr) {
elog ERROR, <<ERR;
Could not connect to database
data source: $datasource
user: $username
password: $password
dbh attributes:
$attributes
$DBI::errstr
ERR
}
return $dbh;
}
#--
#-- SUB: getQueryWithNoRecords
#-- Places a FALSE filter into a query in order to retrieve no records (query structure only)
#--
sub getQueryWithNoRecords {
#-- Function parameters
my ($dbh, $query) = @_;
my $sth;
my $sql;
#-- some fake databases do not support subqueries, so we need to modify
#-- the original query and insert "1=0 AND " or "WHERE 1=0" as needed
#-- of course, we try "SELECT * FROM ($query) WHERE 1=0" first,
#-- for databases that support subqueries, or better: REAL databases
$sql = 'SELECT * FROM ($query) query WHERE 1=0';
elog NOTICE, $sql if $main::DEBUG == 1;
$sth = $dbh->prepare($sql);
if ($DBI::errstr) {
$query =~ tr/\n/ /;
my $lc_query = lc($query);
my $wherepos = rindex($lc_query, " where ");
if ($wherepos > 0) {
$sql = substr($query, 0, $wherepos) . ' WHERE 1=0 AND ' . substr($query, $wherepos + length(" where "));
} else {
my $orderbypos = rindex($lc_query, " order by ");
my $groupbypos = rindex($lc_query, " group by ");
if ($orderbypos < $groupbypos) {
$wherepos = $orderbypos;
} else {
$wherepos = $groupbypos;
}
if ($wherepos == -1) {
$wherepos = length($lc_query);
}
$sql = substr($query, 0, $wherepos) . ' WHERE 1=0 ' . substr($query, $wherepos);
}
elog NOTICE, $sql if $main::DEBUG == 1;
$sth = $dbh->prepare($sql);
if ($DBI::errstr) {
my $err = <<ERR2;
Cannot prepare
$sql
$DBI::errstr
ERR2
elog ERROR, $err;
}
}
return $sql;
}
#--
#-- SUB: getUnifiedSqlType
#-- Maps a set of SQL types into a single SQL type
#--
sub getUnifiedSqlType {
my ($dbiType) = @_;
#--------------------------------
#-- SQL datatype codes
#-- SQL_GUID (-11)
#-- SQL_WLONGVARCHAR (-10)
#-- SQL_WVARCHAR (-9)
#-- SQL_WCHAR (-8)
#-- SQL_BIT (-7)
#-- SQL_TINYINT (-6)
#-- SQL_BIGINT (-5)
#-- SQL_LONGVARBINARY (-4)
#-- SQL_VARBINARY (-3)
#-- SQL_BINARY (-2)
#-- SQL_LONGVARCHAR (-1)
#-- SQL_UNKNOWN_TYPE 0
#-- SQL_ALL_TYPES 0
#-- SQL_CHAR 1
#-- SQL_NUMERIC 2
#-- SQL_DECIMAL 3
#-- SQL_INTEGER 4
#-- SQL_SMALLINT 5
#-- SQL_FLOAT 6
#-- SQL_REAL 7
#-- SQL_DOUBLE 8
#-- SQL_DATETIME 9
#-- SQL_DATE 9
#-- SQL_INTERVAL 10
#-- SQL_TIME 10
#-- SQL_TIMESTAMP 11
#-- SQL_VARCHAR 12
#-- SQL_BOOLEAN 16
#-- SQL_UDT 17
#-- SQL_UDT_LOCATOR 18
#-- SQL_ROW 19
#-- SQL_REF 20
#-- SQL_BLOB 30
#-- SQL_BLOB_LOCATOR 31
#-- SQL_CLOB 40
#-- SQL_CLOB_LOCATOR 41
#-- SQL_ARRAY 50
#-- SQL_ARRAY_LOCATOR 51
#-- SQL_MULTISET 55
#-- SQL_MULTISET_LOCATOR 56
#-- SQL_TYPE_DATE 91
#-- SQL_TYPE_TIME 92
#-- SQL_TYPE_TIMESTAMP 93
#-- SQL_TYPE_TIME_WITH_TIMEZONE 94
#-- SQL_TYPE_TIMESTAMP_WITH_TIMEZONE 95
#-- SQL_INTERVAL_YEAR 101
#-- SQL_INTERVAL_MONTH 102
#-- SQL_INTERVAL_DAY 103
#-- SQL_INTERVAL_HOUR 104
#-- SQL_INTERVAL_MINUTE 105
#-- SQL_INTERVAL_SECOND 106
#-- SQL_INTERVAL_YEAR_TO_MONTH 107
#-- SQL_INTERVAL_DAY_TO_HOUR 108
#-- SQL_INTERVAL_DAY_TO_MINUTE 109
#-- SQL_INTERVAL_DAY_TO_SECOND 110
#-- SQL_INTERVAL_HOUR_TO_MINUTE 111
#-- SQL_INTERVAL_HOUR_TO_SECOND 112
#-- SQL_INTERVAL_MINUTE_TO_SECOND 113
#--------------------------------
#--PG_BOOL PG_BYTEA PG_CHAR PG_INT8 PG_INT2 PG_INT4 PG_TEXT PG_OID
#--PG_FLOAT4 PG_FLOAT8 PG_ABSTIME PG_RELTIME PG_TINTERVAL PG_BPCHAR
#--PG_VARCHAR PG_DATE PG_TIME PG_DATETIME PG_TIMESPAN PG_TIMESTAMP
if (($dbiType == DBI::SQL_INTEGER)) {
return DBI::SQL_INTEGER;
} elsif (($dbiType == DBI::SQL_SMALLINT)) {
return DBI::SQL_SMALLINT;
} elsif (($dbiType == DBI::SQL_TYPE_TIME_WITH_TIMEZONE)
|| ($dbiType == DBI::SQL_TYPE_TIMESTAMP_WITH_TIMEZONE)
|| ( $dbiType == DBI::SQL_DATETIME )
|| ( $dbiType == DBI::SQL_TIMESTAMP )
|| ( $dbiType == DBI::SQL_TYPE_TIMESTAMP ) ) { #--TIMESTAMP
return DBI::SQL_TIMESTAMP;
} elsif (($dbiType == DBI::SQL_NUMERIC )
|| ($dbiType == DBI::SQL_DECIMAL )
|| ($dbiType == DBI::SQL_FLOAT )
|| ($dbiType == DBI::SQL_REAL )
|| ($dbiType == DBI::SQL_DOUBLE )
#-- || ($dbiType == DBI::SQL_BIGINT )
|| ($dbiType == DBI::SQL_TINYINT )) { #--NUMERIC
return DBI::SQL_NUMERIC;
} elsif ( ( $dbiType == DBI::SQL_BOOLEAN )
|| ( $dbiType == DBI::SQL_BIT ) ) { #--BOOLEAN
return DBI::SQL_BOOLEAN;
} elsif ( ($dbiType == DBI::SQL_CHAR )
|| ( $dbiType == DBI::SQL_WCHAR ) ) { #--CHAR
return DBI::SQL_CHAR;
} elsif ( ($dbiType == DBI::SQL_VARCHAR )
|| ( $dbiType == DBI::SQL_WVARCHAR )) { #--VARCHAR
return DBI::SQL_VARCHAR;
} elsif ( ( $dbiType == DBI::SQL_LONGVARCHAR )
|| ( $dbiType == DBI::SQL_WLONGVARCHAR )
|| ( $dbiType == DBI::SQL_CLOB ) ) { #--CLOB
return DBI::SQL_CLOB;
} elsif ( ( $dbiType == DBI::SQL_DATE )
|| ( $dbiType == DBI::SQL_TYPE_DATE ) ) { #--DATE
return DBI::SQL_DATE;
} elsif ( ( $dbiType == DBI::SQL_TIME )
|| ( $dbiType == DBI::SQL_TYPE_TIME ) ) { #--TIME
return DBI::SQL_TIME;
} elsif ( ( $dbiType == DBI::SQL_INTERVAL )
|| ( $dbiType == DBI::SQL_INTERVAL_YEAR )
|| ( $dbiType == DBI::SQL_INTERVAL_MONTH )
|| ( $dbiType == DBI::SQL_INTERVAL_DAY )
|| ( $dbiType == DBI::SQL_INTERVAL_HOUR )
|| ( $dbiType == DBI::SQL_INTERVAL_MINUTE )
|| ( $dbiType == DBI::SQL_INTERVAL_SECOND )
|| ( $dbiType == DBI::SQL_INTERVAL_YEAR_TO_MONTH )
|| ( $dbiType == DBI::SQL_INTERVAL_DAY_TO_HOUR )
|| ( $dbiType == DBI::SQL_INTERVAL_DAY_TO_MINUTE )
|| ( $dbiType == DBI::SQL_INTERVAL_DAY_TO_SECOND )
|| ( $dbiType == DBI::SQL_INTERVAL_HOUR_TO_MINUTE )
|| ( $dbiType == DBI::SQL_INTERVAL_HOUR_TO_SECOND )
|| ( $dbiType == DBI::SQL_INTERVAL_MINUTE_TO_SECOND ) ) { #--INTERVAL
return DBI::SQL_INTERVAL;
} elsif ( ( $dbiType == DBI::SQL_BINARY )
|| ( $dbiType == DBI::SQL_VARBINARY )
|| ( $dbiType == DBI::SQL_LONGVARBINARY )
|| ( $dbiType == DBI::SQL_BLOB ) ) { #--BLOB
return $dbiType == DBI::SQL_BLOB;
} else {
return DBI::SQL_VARCHAR; #-- default: VARCHAR
}
}
sub validateModifyPrebuiltTable {
#-- Function parameters
my ($dbh, $schemaname, $snapshotname, $sth_source, $sth_target) = @_;
#-- Local variables
my $sql;
my $sth;
#-- compare column names, types and position
for ( my $i = 0 ; $i < $sth_source->{NUM_OF_FIELDS} ; $i++ ) {
my $columnName = $sth_source->{NAME}->[$i];
if (getUnifiedSqlType($sth_source->{TYPE}->[$i]) ne getUnifiedSqlType($sth_target->{TYPE}->[$i])) {
elog ERROR, "Types of column $columnName mismatch";
}
if ($sth_source->{PRECISION}->[$i] > $sth_target->{PRECISION}->[$i]) {
elog ERROR, "Precision of column $columnName on source exceeds precision on target";
}
if ($sth_source->{SCALE}->[$i] > $sth_target->{SCALE}->[$i]) {
elog ERROR, "Scale of column $columnName on source exceeds scale on target";
}
if ( $sth_source->{NULLABLE}->[$i] ne $sth_target->{NULLABLE}->[$i] ) {
my $n;
my $n1;
if (! $sth_source->{NULLABLE}->[$i]) {
$n = 'NOT NULLABLE';
$n1 = 'NULLABLE';
} else {
$n = 'NULLABLE';
$n1 = 'NOT NULLABLE';
}
elog WARNING, "Column $columnName is $n on source but is $n1 on target";
}
}
elog NOTICE, "Prebuilt table matches source query." if $main::DEBUG==1;
#-- tests if the $pbt column exists
$sql = <<SQL;
SELECT pbt\$
FROM $schemaname.$snapshotname
WHERE 1=0
SQL
$sth = $dbh->prepare($sql);
$sth->execute();
my $err = $sth->err;
$sth->finish();
$dbh->commit;
if ($err) {
#-- column $pbt does not exist
$sql = <<SQL;
ALTER TABLE $schemaname.$snapshotname
ADD COLUMN pbt\$ NUMERIC default 0
SQL
$dbh->do($sql);
if ($dbh->err) {
elog ERROR, "Could not add the 'pbt\$' column to $schemaname.$snapshotname. SQL=$sql. [".$dbh->errstr.']';
}
} else {
elog WARNING, "Column \$pbt already exists on $schemaname.$snapshotname";
}
$sql = <<SQL;
UPDATE $schemaname.$snapshotname
SET pbt\$=0
WHERE pbt\$ IS NULL
SQL
$dbh->do($sql);
if ($dbh->err) {
elog ERROR, "Could not set 'pbt\$' column on existing rows to 0 on $schemaname.$snapshotname. SQL=$sql. [".$dbh->errstr.']';
}
$dbh->commit;
$sql = <<SQL;
ALTER TABLE $schemaname.$snapshotname
ALTER COLUMN pbt\$ SET NOT NULL
SQL
$dbh->do($sql);
if ($dbh->err) {
elog ERROR, "Could not modify the 'pbt\$' column to NOT NULL on $schemaname.$snapshotname. SQL=$sql. [".$dbh->errstr.']';
}
$sql = <<SQL;
CREATE INDEX ${snapshotname}_pbt\$_ix
ON $schemaname.$snapshotname (pbt\$)
SQL
$dbh->do($sql);
if ($dbh->err) {
elog WARNING, "Could not create index on 'pbt\$' column on $schemaname.$snapshotname. SQL=$sql. [".$dbh->errstr.']';
}
$dbh->commit;
$sql = <<SQL;
CREATE OR REPLACE FUNCTION $schemaname.${snapshotname}_pbt\$_trgfn()
RETURNS trigger AS
\$BODYFN\$
BEGIN
IF (TG_OP = 'UPDATE') THEN
IF ((OLD.pbt\$ <> 0) OR (NEW.pbt\$ <> 0)) THEN
RAISE EXCEPTION 'Invalid operation on snapshot-based data!';
END IF;
ELSIF (TG_OP = 'DELETE') THEN
IF (OLD.pbt\$ <> 0) THEN
RAISE EXCEPTION 'Invalid operation on snapshot-based data!';
END IF;
ELSIF (TG_OP = 'INSERT') THEN
IF ((NEW.pbt\$ IS NOT NULL) AND (NEW.pbt\$ <> 0)) THEN
RAISE EXCEPTION 'You cannot insert snapshot-based data manually!';
END IF;
END IF;
RETURN NULL;
END;
\$BODYFN\$
LANGUAGE 'plpgsql' VOLATILE;
SQL
elog NOTICE, $sql if $main::DEBUG==1;
$dbh->do($sql);
if ($dbh->err) {
elog ERROR, "Could not create the prebuilt table trigger function for table '$schemaname.$snapshotname'. ERR='".$dbh->errstr."' SQL='$sql'";
}
#-- Create TRIGGER on the prebuilt table
elog NOTICE, "Creating trigger ${snapshotname}_pbt\$_trg on $schemaname.${snapshotname}" if $main::DEBUG==1;
$sql = <<SQL;
CREATE TRIGGER ${snapshotname}_pbt\$_trg AFTER INSERT OR UPDATE OR DELETE
ON $schemaname.${snapshotname} FOR EACH ROW
EXECUTE PROCEDURE $schemaname.${snapshotname}_pbt\$_trgfn ()
SQL
$dbh->do($sql);
if ($dbh->err) {
elog WARNING, "Could not create the prebuilt table trigger on table '$schemaname.$snapshotname'. ERR='".$dbh->errstr."' SQL='$sql'";
}
}
$BODY$
LANGUAGE 'plperlu' VOLATILE;
ALTER FUNCTION public.create_snapshot(schemaname text, snapshotname text, query text, dblink text, kind text, pbt_table text) OWNER TO postgres;
COMMENT ON FUNCTION public.create_snapshot(schemaname text, snapshotname text, query text, dblink text, kind text, pbt_table text) IS $$
This function is part of PostgreSQL::Snapshots project.
This is the function that creates a snapshot. It only creates the snapshot placeholder with the same structure as the query result. The snapshot needs to be refresh to become filled.
$$;
--
-- ATTENTION: This is a free software.
-- View the LICENSE.txt file for license information
--
------------------------------------------------------------------------------
-- FUNCTION: public.drop_snapshot
--
-- Removes a SNAPSHOT previously created with create_snapshot
------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION public.drop_snapshot(schemaname text, snapshotname text)
RETURNS bool AS
$BODY$
use strict;
use DBI;
use constant TRUE => 1;
use constant FALSE => "";
#-- Function parameters
my ($schemaname, $snapshotname) = @_;
$schemaname = lc($schemaname);
$snapshotname = lc($snapshotname);
#-- Set this to 1 for debugging messages
$main::DEBUG=0;
#-- Localhost superuser connection
my $dbh_local = DBI->connect('dbi:Pg:dbname=labpeTeste', 'postgres', undef, {AutoCommit => 0});
my $dbh_remote;
my $sql;
my $sth;
my $row;
my $masterschema;
my $mastername;
my $snapshot;
my $dblink;
my $attr_href;
my $affected;
my $total;
if (! snapshotExists($dbh_local, $schemaname, $snapshotname)) {
elog ERROR, "Snapshot '$schemaname.$snapshotname' does not exist";
}
my $snapshot = getSnapshot($dbh_local, $schemaname, $snapshotname);
if ("$snapshot->{'dblinkid'}" eq '') {
#-- refresh local
$dbh_remote = $dbh_local;
} else {
$dblink = getDblinkById($dbh_local, $snapshot->{'dblinkid'});
if ($dblink) {
#--create the remote database connection
$attr_href = eval($dblink->{'attributes'});
$dbh_remote = dbiGetConnection(
$dblink->{'datasource'}
, $dblink->{'username'}
, $dblink->{'password'}
, $attr_href
);
} else {
$dbh_remote = $dbh_local;
}
}
($masterschema, $mastername) = retrieveMasterForSnapshot($dbh_remote, $snapshot);
if (snapshotLogExists($dbh_remote, $masterschema, $mastername)) {
call_driver_function($dbh_remote, 'snapshot_do', 'UNREGISTER', $masterschema, $mastername, $snapshot->{'snapid'});
}
if ("$snapshot->{'dblinkid'}" ne '') {
$dbh_remote->commit;
$dbh_remote->disconnect;
}
#-- Then delete the SNAPSHOT entry in public.pg_snapshots
$sql = <<SQL;
DELETE FROM public.pg_snapshots
WHERE schemaname=?
AND snapshotname=?
SQL
$sth = $dbh_local->prepare($sql);
$sth->execute(($schemaname, $snapshotname));
if (! $sth->err) {
elog NOTICE, "Snapshot entry removed" if $main::DEBUG==1;
} else {
elog ERROR, "Could not remove Snapshot entry '$schemaname.$snapshotname'. $sth->{errstr}";
}
if ("$snapshot->{'pbt_table'}" ne '') {
my $pbt_table = $snapshot->{'pbt_table'};
my $row;
my $sth;
setTriggerStatus($dbh_local, $snapshot->{'schemaname'}, $snapshot->{'pbt_table'}, "$snapshot->{'pbt_table'}_pbt\$_trg", FALSE);
#-- Remove all rows based on this snapshot
$sql = <<SQL;
DELETE FROM $schemaname.${pbt_table}
WHERE pbt\$ = ?
SQL
$sth = $dbh_local->prepare($sql);
$affected = $sth->execute(($snapshot->{'snapid'}));
if ($sth->err) {
elog ERROR, "Could not delete snapshot based rows from prebuilt table. SQL=$sql.".$sth->errstr;
}
$dbh_local->commit;
elog NOTICE, "Prebuilt table snapshot rows deleted: $affected" if $main::DEBUG==1;
#-- Find out whether we are the last snapshot using this prebuilt table
$sql = <<SQL;
SELECT count(*) as total
FROM public.pg_snapshots
WHERE schemaname=?
AND pbt_table=?
SQL
my ($total) = sqlLookup($dbh_local, $sql, ($schemaname, $pbt_table));
if ($total eq 0) {
elog NOTICE, "We are the last snapshot on this prebuilt table!" if $main::DEBUG==1;
#-- Drop pbt$ column index
$sql = <<SQL;
DROP INDEX ${pbt_table}_pbt\$_ix
SQL
$dbh_local->do($sql);
if ($dbh_local->err) {
elog WARNING, "Could not drop the prebuilt table pbt\$ column index on table '$schemaname.$pbt_table'. ERR='".$dbh_local->errstr."' SQL='$sql'";
}
elog NOTICE, "Prebuilt table pbt\$ column index dropped." if $main::DEBUG==1;
#-- Drop pbt$ column on Prebuilt table
$sql = <<SQL;
ALTER TABLE $schemaname.${pbt_table}
DROP COLUMN pbt\$
SQL
$dbh_local->do($sql);
if ($dbh_local->err) {
elog ERROR, "Could not drop the prebuilt table pbt\$ column on table '$schemaname.$pbt_table'. ERR='".$dbh_local->errstr."' SQL='$sql'";
}
elog NOTICE, "Prebuilt table pbt\$ column dropped." if $main::DEBUG==1;
#-- Drop triggers on Prebuilt table
$sql = <<SQL;
DROP TRIGGER ${pbt_table}_pbt\$_trg ON $schemaname.${pbt_table}
SQL
$dbh_local->do($sql);
if ($dbh_local->err) {
elog ERROR, "Could not drop the prebuilt table trigger on table '$schemaname.$pbt_table'. ERR='".$dbh_local->errstr."' SQL='$sql'";
}
elog NOTICE, "Prebuilt table trigger dropped." if $main::DEBUG==1;
#-- Drop trigger's function
$sql = <<SQL;
DROP FUNCTION $schemaname.${pbt_table}_pbt\$_trgfn();
SQL
$dbh_local->do($sql);
if ($dbh_local->err) {
elog ERROR, "Could not drop the prebuilt table trigger function '$schemaname.${pbt_table}_trgfn()'. ERR='".$dbh_local->errstr."' SQL='$sql'";
}
elog NOTICE, "Prebuilt table trigger function dropped." if $main::DEBUG==1;
} else {
setTriggerStatus($dbh_local, $snapshot->{'schemaname'}, $snapshot->{'pbt_table'}, "$snapshot->{'pbt_table'}_pbt\$_trg", TRUE);
}
} else {
#-- Actually drop the SNAPSHOT placeholder
$sql = <<SQL;
DROP TABLE $schemaname.$snapshotname
SQL
$sth = spi_exec_query($sql);
if ($sth->{status} eq 'SPI_OK_UTILITY') {
elog NOTICE, "Snapshot dropped" if $main::DEBUG==1;
} else {
elog ERROR, "Could not drop Snapshot '$schemaname.$snapshotname'. $sth->{status}";
}
}
$dbh_local->commit;
$dbh_local->disconnect;
#-- All done. Let's return TRUE
return TRUE;
#--
#-- SUB: sqlLookup
#-- Returns a one row result from a SQL query
#--
sub sqlLookup {
my ($dbh, $sql, @params) = @_;
return sqlLookup2 ($dbh, $sql, \@params, TRUE);
}
sub sqlLookupSoft {
my ($dbh, $sql, @params) = @_;
return sqlLookup2 ($dbh, $sql, \@params, FALSE);
}
sub sqlLookup2 {
my ($dbh, $sql, $params, $errorsAreFatal) = @_;
my $sth;
my @row;
elog NOTICE, "Lookup at SQL:$sql with ".((@$params ne 0) ? join(',', @$params) : '<none>') if $main::DEBUG==1;
$sth = $dbh->prepare($sql);
if (@$params ne 0) {
$sth->execute(@$params);
} else {
$sth->execute();
}
if ($sth->err) {
if ($errorsAreFatal) {
elog ERROR, "Fatal error! SQL=$sql ERROR=".$sth->errstr;
}
}
my @row = $sth->fetchrow_array;
$sth->finish;
return @row;
}
#--
#-- SUB: snapshotExists
#-- Returns whether a snapshot exists or not
#--
sub snapshotExists {
#-- Function parameters
my ($dbh, $schemaname, $snapshotname) = @_;
# local variables
my $sql;
my $sth;
my $row;
$sql = <<SQL;
SELECT count(*) as total
FROM public.pg_snapshots
WHERE schemaname=?
and snapshotname=?
SQL
$sth = $dbh->prepare($sql);
$sth->execute(($schemaname, $snapshotname));
if ($sth->err) {
elog ERROR, "Could not find public.pg_snapshots !!!";
}
$row = $sth->fetchrow_hashref;
return ($row->{'total'} ne 0);
}
#--
#-- SUB: getSnapshot
#-- Returns the named SNAPSHOT
#--
sub getSnapshot {
#-- Function parameters
my ($dbh, $schemaname, $snapshotname) = @_;
#-- Local variables
my $sql;
my $sth;
#-- Get snapshot info
$sql = 'SELECT * FROM public.pg_snapshots WHERE schemaname=? AND snapshotname=?';
$sth = $dbh->prepare($sql);
$sth->execute(($schemaname, $snapshotname));
return $sth->fetchrow_hashref;
}
#--
#-- SUB: getDblinkById
#-- Returns a DBLINK by its ID
#--
sub getDblinkById {
#-- Function parameters
my ($dbh, $dblinkid) = @_;
#-- Local variables
my $sql;
my $sth;
#--get dblink info
$sql = 'SELECT * FROM public.pg_dblinks WHERE dblinkid=?';
$sth = $dbh->prepare($sql);
$sth->execute(($dblinkid));
if ($sth->err) {
elog ERROR, "DBLINK '$dblinkid' does not exist!";
}
return $sth->fetchrow_hashref;
}
#--
#-- SUB: dbiGetConnection
#-- Connects to a DBI datasource and returns a connection handle
#--
sub dbiGetConnection {
my ($datasource, $username, $password, $attributes) = @_;
# local variables
my $dbh;
$dbh = DBI->connect(
$datasource
, $username
, $password
, $attributes
);
if ($DBI::errstr) {
elog ERROR, <<ERR;
Could not connect to database
data source: $datasource
user: $username
password: $password
dbh attributes:
$attributes
$DBI::errstr
ERR
}
return $dbh;
}
#--
#-- SUB: retrieveMasterForSnapshot
#-- Retrieve the MASTER tables for a SNAPSHOT
#--
sub retrieveMasterForSnapshot {
#-- Function parameters
my ($dbh, $snapshot) = @_;
#-- Local variables
my $masterschema;
my $mastername;
my $sql;
my $sth;
my $relatedCount;
my $tablename;
my @related=();
my @tables=();
$tablename = $snapshot->{'query'};
@related = $tablename =~ m/(from|join) +((([0-9a-z_\$]+|".+")\.)?([0-9a-z_\$]+|".+")( *, *(([0-9a-z_\$]+|".+")\.)?([0-9a-z_\$]+|".+"))*)/gi;
for (my $i = 1; $i < @related; $i += 9) {
push(@tables, @related[$i]);
}
if (@tables eq 0) {
elog ERROR, 'Origin TABLES not found !';
}
if (@tables eq 1) {
my $tablename = @tables[0];
elog NOTICE, "TABLE=[$tablename]" if $main::DEBUG==1;
($masterschema, $mastername) = split(/\./, $tablename);
elog NOTICE, "MASTERSCHEMA=[$masterschema] MASTERNAME=[$mastername]" if $main::DEBUG==1;
if ($mastername eq undef) {
#-- No Schema
elog NOTICE, 'Sorry, due to the complexity of your query, I could not guess the Schema name of your tables, so I will try a COMPLETE REFRESH instead.';
$mastername = $masterschema;
$snapshot->{'kind'} = 'C';
$masterschema = undef;
}
return ($masterschema, $mastername);
}
if (($snapshot->{'kind'} eq 'F') || ($snapshot->{'kind'} eq 'R')) {
elog NOTICE, "Refresh FAST only work with one-table only queries. Falling back to COMPLETE REFRESH. TABLES=[" . join(',', @tables) . ']';
$snapshot->{'kind'} = 'C';
}
return (undef, undef);
}
#--
#-- SUB: snapshotLogExists
#-- Returns whether a master object has a SNAPSHOT LOG
#--
sub snapshotLogExists {
#-- Function parameters
my ($dbh, $masterschema, $mastername) = @_;
my $result = call_driver_function($dbh, 'snapshotlog_exists', undef, $masterschema, $mastername, undef);
return ($result eq 'T');
}
#--
#-- SUB: call_driver_function
#-- Calls a driver function (native) on the database pointed by the database handle ($dbh)
#--
sub call_driver_function {
my ($dbh, $function_name, $operation, $masterschema, $mastername, $additional) = @_;
$masterschema = uc($masterschema);
$mastername = uc($mastername);
elog NOTICE, "Calling DRIVER function $function_name ($operation, $masterschema, $mastername, $additional)" if $main::DEBUG==1;
if ($function_name eq 'snapshot_do') {
dbi_execute_stored_procedure($dbh, 'snapshot_do', ('123456', $operation, $masterschema, $mastername, $additional));
} elsif ($function_name eq 'snapshotlog_exists') {
return dbi_call_function($dbh, 'snapshotlog_exists', ($masterschema, $mastername));
} elsif ($function_name eq 'last_log_refresh') {
return dbi_call_function($dbh, 'last_log_refresh' , ($masterschema, $mastername, $additional));
} elsif ($function_name eq 'snapshotlog_columns') {
return dbi_call_function($dbh, 'snapshotlog_columns', ($masterschema, $mastername));
} elsif ($function_name eq 'snapshotlog_name') {
return dbi_call_function($dbh, 'snapshotlog_name', ($masterschema, $mastername));
} elsif ($function_name eq 'count_log_modified_rows') {
return dbi_call_function($dbh, 'count_log_modified_rows', ($masterschema, $mastername, $additional));
} elsif ($function_name eq 'snapshotlog_ud_filter') {
return dbi_call_function($dbh, 'snapshotlog_ud_filter', ($additional));
} elsif ($function_name eq 'snapshotlog_iu_filter') {
return dbi_call_function($dbh, 'snapshotlog_iu_filter', ($additional));
} else {
elog ERROR, "Could not call DRIVER function: $function_name";
}
$dbh->commit;
}
sub dbi_execute_stored_procedure {
use DBI::Const::GetInfoType;
my ($dbh, $procname, @parameters) = @_;
my $dbname = lc($dbh->get_info( $GetInfoType{SQL_DBMS_NAME} ));
my $sql;
my $sth;
my $qmarks = '?,' x @parameters;
chop $qmarks;
if ($dbname eq 'oracle') {
$sql = "BEGIN $procname($qmarks); END;";
} elsif ($dbname eq 'postgresql') {
$sql = "SELECT $procname($qmarks)";
} else {
$sql = "SELECT $procname($qmarks)";
}
$sth = $dbh->prepare($sql);
if ($dbh->err) {
elog ERROR, $dbh->errstr."' SQL='$sql'";
}
$sth->execute(@parameters);
if ($sth->err) {
elog ERROR, $sth->errstr."' SQL='$sql'";
}
}
sub dbi_call_function {
use DBI::Const::GetInfoType;
my ($dbh, $funcname, @parameters) = @_;
my $dbname = lc($dbh->get_info( $GetInfoType{SQL_DBMS_NAME} ));
my $sql;
my $sth;
my @row;
my $qmarks = '?,' x @parameters;
chop $qmarks;
if ($dbname eq 'oracle') {
$sql = "SELECT $funcname($qmarks) FROM DUAL";
} elsif ($dbname eq 'postgresql') {
$sql = "SELECT $funcname($qmarks)";
} else {
$sql = "SELECT $funcname($qmarks)";
}
$sth = $dbh->prepare($sql);
if ($dbh->err) {
elog ERROR, $dbh->errstr."' SQL='$sql'";
}
$sth->execute(@parameters);
if ($sth->err) {
elog ERROR, $sth->errstr."' SQL='$sql'";
}
@row = $sth->fetchrow_array;
my $result = @row[0];
return $result;
}
#--
#-- SUB: setTriggerStatus
#-- Enable/Disable a trigger
#--
sub setTriggerStatus {
#-- Function parameters
my ($dbh, $schemaname, $tablename, $triggername, $status) = @_;
#-- Local variables
my $sql;
my $sth;
my $tgenabled = ($status == TRUE) ? 'TRUE' : 'FALSE';
$sql = <<SQL;
SELECT c.oid
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname=?
AND c.relname=?
SQL
my ($tableid) = sqlLookup($dbh, $sql, ($schemaname, $tablename));
if ("$tableid" eq "") {
elog ERROR, "Could not retrieve TABLEID for table $schemaname.$tablename";
}
elog NOTICE, "Disabling trigger for table ID=$tableid NAME=$triggername" if $main::DEBUG==1;
$sql = <<SQL;
UPDATE pg_trigger
SET tgenabled = $tgenabled
WHERE tgrelid=?
AND tgname=?
SQL
elog NOTICE, $sql if $main::DEBUG==1;
$sth = $dbh->prepare($sql);
$sth->execute(($tableid, $triggername));
#-- Dummy update to refresh new trigger status
if ($sth->err) {
elog ERROR, $sth->errstr;
}
$sql = <<SQL;
UPDATE pg_class
SET relname=relname
WHERE oid=?
SQL
$sth = $dbh->prepare($sql);
$sth->execute(($tableid));
if ($sth->err) {
elog ERROR, $sth->errstr;
}
}
$BODY$
LANGUAGE 'plperlu' VOLATILE;
ALTER FUNCTION public.drop_snapshot(schemaname text, snapshotname text) OWNER TO postgres;
COMMENT ON FUNCTION public.drop_snapshot(schemaname text, snapshotname text) IS $$
This function is part of PostgreSQL::Snapshots project.
This is the function that removes a snapshot.
$$;
--
-- ATTENTION: This is a free software.
-- View the LICENSE.txt file for license information
--
------------------------------------------------------------------------------
-- FUNCTION: public.refresh_snapshot
--
-- Refreshes(Fills) a previously created SNAPSHOT
-- It can refresh using the FORCE, FAST or COMPLETE method
------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION public.refresh_snapshot(schemaname text, snapshotname text)
RETURNS bool AS
$BODY$
use strict;
use DBI;
use constant TRUE => 1;
use constant FALSE => "";
#-- Function parameters
my ($schemaname, $snapshotname) = @_;
$schemaname = lc($schemaname);
$snapshotname = lc($snapshotname);
#-- Set this to 1 for debugging messages
$main::DEBUG=0;
#-- Localhost superuser connection
my $dbh_local = DBI->connect('dbi:Pg:dbname=labpeTeste', 'postgres', undef, {AutoCommit => 0});
#-- Remote database connection
my $dbh;
#-- Start time of this function
my $start_time = time();
my $sql = '';
my $rs;
my $row;
#-- Number of records refreshed
my $recs;
if (! snapshotExists($dbh_local, $schemaname, $snapshotname)) {
elog ERROR, "Snapshot '$schemaname.$snapshotname' does not exist";
}
my $snapshot = getSnapshot($dbh_local, $schemaname, $snapshotname);
my $snapname = ("$snapshot->{'pbt_table'}" eq '') ? $snapshotname : $snapshot->{'pbt_table'};
#-- Test for object's privileges
my ($hasInsert, $hasDelete) = testObjectPrivileges($schemaname,$snapname,('insert', 'delete'));
if (! $hasInsert) {
elog ERROR, "You don't have INSERT privilege on '$schemaname.$snapshotname' SNAPSHOT";
}
if (! $hasDelete) {
elog ERROR, "You don't have DELETE privilege on '$schemaname.$snapshotname' SNAPSHOT";
}
if ("$snapshot->{'dblinkid'}" eq '') {
#-- refresh local
$recs = refreshSnapshot($dbh_local, $snapshot, undef);
} else {
#-- refresh remote
$recs = refreshSnapshot($dbh_local, $snapshot, getDblinkById($dbh_local, $snapshot->{'dblinkid'}));
}
#-- Compute the elapsed time
my $stop_time = time();
my $secs = $stop_time - $start_time;
elog NOTICE, "Refreshed $recs records in $secs seconds.";
$sql = "UPDATE public.pg_snapshots SET elapsedtime=? WHERE schemaname=? and snapshotname=?";
$rs=$dbh_local->prepare($sql);
$rs->execute(($secs, $schemaname, $snapshotname));
if ($rs->err) {
elog NOTICE, "Could not update snapshot '$snapshot->{schemaname}.$snapshot->{snapshotname}' information. Error:" . $rs->errstr;
}
$dbh_local->commit;
$dbh_local->disconnect;
#-- Renew internal statistics about the refreshed object
$dbh_local = DBI->connect('dbi:Pg:dbname=labpeTeste', 'postgres', undef, {AutoCommit => 1});
if (! vacuum($dbh_local, $snapshot->{schemaname}, $snapname)) {
elog NOTICE, "Could not vacuum snapshot '$schemaname.$snapname':" . $dbh_local->errstr;
}
$dbh_local->commit;
$dbh_local->disconnect;
#-- All done. Let's return TRUE
return TRUE;
#--
#-- SUB: sqlLookup
#-- Returns a one row result from a SQL query
#--
sub sqlLookup {
my ($dbh, $sql, @params) = @_;
return sqlLookup2 ($dbh, $sql, \@params, TRUE);
}
sub sqlLookupSoft {
my ($dbh, $sql, @params) = @_;
return sqlLookup2 ($dbh, $sql, \@params, FALSE);
}
sub sqlLookup2 {
my ($dbh, $sql, $params, $errorsAreFatal) = @_;
my $sth;
my @row;
elog NOTICE, "Lookup at SQL:$sql with ".((@$params ne 0) ? join(',', @$params) : '<none>') if $main::DEBUG==1;
$sth = $dbh->prepare($sql);
if (@$params ne 0) {
$sth->execute(@$params);
} else {
$sth->execute();
}
if ($sth->err) {
if ($errorsAreFatal) {
elog ERROR, "Fatal error! SQL=$sql ERROR=".$sth->errstr;
}
}
my @row = $sth->fetchrow_array;
$sth->finish;
return @row;
}
#--
#-- SUB: snapshotExists
#-- Returns whether a snapshot exists or not
#--
sub snapshotExists {
#-- Function parameters
my ($dbh, $schemaname, $snapshotname) = @_;
# local variables
my $sql;
my $sth;
my $row;
$sql = <<SQL;
SELECT count(*) as total
FROM public.pg_snapshots
WHERE schemaname=?
and snapshotname=?
SQL
$sth = $dbh->prepare($sql);
$sth->execute(($schemaname, $snapshotname));
if ($sth->err) {
elog ERROR, "Could not find public.pg_snapshots !!!";
}
$row = $sth->fetchrow_hashref;
return ($row->{'total'} ne 0);
}
#--
#-- SUB: testObjectPrivileges
#-- Tests the passed privileges of the current user against the target databse object and returns a result array
#--
sub testObjectPrivileges {
#-- Function parameters
my ($schemaname,$snapshotname,@privileges) = @_;
#-- Local variables
my $sql;
my $sth;
my @result=();
#-- Create the query
$sql = 'SELECT ';
for (my $i=0; $i < @privileges; ++$i) {
my $privilege = @privileges[$i];
$sql = "$sql has_table_privilege('$schemaname.$snapshotname', '$privilege') as has_$privilege,";
}
chop $sql;
elog NOTICE, $sql if $main::DEBUG==1;
$sth = spi_exec_query($sql);
for (my $i=0; $i < @privileges; ++$i) {
my $privilege = @privileges[$i];
@result[$i] = $sth->{rows}[0]->{"has_${privilege}"} eq 't';
}
elog NOTICE, 'PRIVILEGES:('.join(',', @privileges).')=('.join(',', @result).')' if $main::DEBUG==1;
return @result;
}
#--
#-- SUB: getSnapshot
#-- Returns the named SNAPSHOT
#--
sub getSnapshot {
#-- Function parameters
my ($dbh, $schemaname, $snapshotname) = @_;
#-- Local variables
my $sql;
my $sth;
#-- Get snapshot info
$sql = 'SELECT * FROM public.pg_snapshots WHERE schemaname=? AND snapshotname=?';
$sth = $dbh->prepare($sql);
$sth->execute(($schemaname, $snapshotname));
return $sth->fetchrow_hashref;
}
#--
#-- SUB: refreshSnapshot
#-- Refresh a SNAPSHOT with a database query result
#--
#-- Perl trim function to remove whitespace from the start and end of the string
sub trim($)
{
my $string = shift;
$string =~ s/^\s+//;
$string =~ s/\s+$//;
return $string;
}
#-- Returns the current date/time
sub getNow {
my $now = trim(`date "+%Y-%m-%d %H:%M:%S"`);
return $now;
}
sub refreshSnapshot {
#-- Function parameters
my ($dbh_local, $snapshot, $dblink) = @_;
#-- Local variables
my $recs = 0;
my $attr_href;
my $dbh_remote;
my $masterschema;
my $mastername;
my $kind;
my $lastRefresh;
my $refreshTime;
if ($dblink) {
#--create the remote database connection
$attr_href = eval($dblink->{'attributes'});
$dbh_remote = dbiGetConnection(
$dblink->{'datasource'}
, $dblink->{'username'}
, $dblink->{'password'}
, $attr_href
);
} else {
$dbh_remote = $dbh_local;
}
$dbh_remote->{LongReadLen} = 64 * 1024; #--LOB <= 64Kb
if ($snapshot->{'kind'} eq 'C') {
$masterschema = undef;
$mastername = undef;
$kind = 'C';
$lastRefresh = undef;
} else {
($masterschema, $mastername, $kind, $lastRefresh) = prepareForRefresh($dbh_remote, $snapshot);
}
$refreshTime = getNow();
if ($kind eq 'C') {
#-- Refresh Complete
$recs = performCompleteRefresh($dbh_local, $dbh_remote, $snapshot);
} else {
#-- Refresh FAST
$recs = performFastRefresh($dbh_local, $dbh_remote, $snapshot, $masterschema, $mastername, $lastRefresh);
}
finalizeRefresh($dbh_remote, $masterschema, $mastername, $snapshot->{'snapid'}, $refreshTime);
if (snapshotLogExists($dbh_remote, $masterschema, $mastername)) {
elog NOTICE, 'Purging Log' if $main::DEBUG==1;
purgeSnapshotLog($dbh_remote, $masterschema, $mastername);
}
if ($recs eq '0E0') {
$recs = 0;
}
if ($dblink) {
$dbh_remote->disconnect;
}
return $recs;
}
#--
#-- SUB: getDblinkById
#-- Returns a DBLINK by its ID
#--
sub getDblinkById {
#-- Function parameters
my ($dbh, $dblinkid) = @_;
#-- Local variables
my $sql;
my $sth;
#--get dblink info
$sql = 'SELECT * FROM public.pg_dblinks WHERE dblinkid=?';
$sth = $dbh->prepare($sql);
$sth->execute(($dblinkid));
if ($sth->err) {
elog ERROR, "DBLINK '$dblinkid' does not exist!";
}
return $sth->fetchrow_hashref;
}
#--
#-- SUB: vacuum
#-- Performs a VACUUM ANALYZE on a target object
#--
sub vacuum {
#-- Function parameters
my ($dbh, $schemaname, $snapshotname) = @_;
#-- Local variables
my $sql;
$sql = "VACUUM ANALYZE $schemaname.$snapshotname";
$dbh->do($sql);
return (! $dbh->err);
}
#--Dependencies
#--
#-- SUB: dbiGetConnection
#-- Connects to a DBI datasource and returns a connection handle
#--
sub dbiGetConnection {
my ($datasource, $username, $password, $attributes) = @_;
# local variables
my $dbh;
$dbh = DBI->connect(
$datasource
, $username
, $password
, $attributes
);
if ($DBI::errstr) {
elog ERROR, <<ERR;
Could not connect to database
data source: $datasource
user: $username
password: $password
dbh attributes:
$attributes
$DBI::errstr
ERR
}
return $dbh;
}
#--
#-- SUB: prepareForRefresh
#-- Prepares the local SNAPSHOT for the refresh process, removing old rows, old logs, etc.
#--
sub prepareForRefresh {
#-- Function parameters
my ($dbh, $snapshot) = @_;
#-- Local variables
my $kind = 'C';
my $lastRefresh = undef;
my $snapid = $snapshot->{'snapid'};
my ($masterschema, $mastername) = retrieveMasterForSnapshot($dbh, $snapshot);
if ($masterschema ne undef) {
if (snapshotLogExists($dbh, $masterschema, $mastername)) { #-- has snapshot log
call_driver_function($dbh, 'snapshot_do', 'REGISTER', $masterschema, $mastername, $snapid);
$lastRefresh = getLastSnapshotLogRefresh($dbh, $masterschema, $mastername, $snapid);
elog NOTICE, "Last refresh on $lastRefresh" if $main::DEBUG==1;
$kind = getRefreshMethod($dbh, $snapshot, $masterschema, $mastername, $lastRefresh);
}
}
elog NOTICE, "Refresh method=$kind" if $main::DEBUG==1;
return ($masterschema, $mastername, $kind, $lastRefresh);
}
#--
#-- SUB: performCompleteRefresh
#-- Performs a COMPLETE REFRESH on a snapshot
#--
sub performCompleteRefresh {
#-- Function parameters
my ($dbh_local, $dbh_remote, $snapshot) = @_;
#-- Local variables
my $sql;
my $sth;
my $sth_local;
my $recs;
my $errors;
my $snapshotname;
my $targetColumnList;
#--Fetches the remote database query
$sql = $snapshot->{'query'};
elog NOTICE, "sql is \n$sql" if $main::DEBUG==1;
$sth = $dbh_remote->prepare($sql);
if ($dbh_remote->err) {
elog ERROR, "Could not fetch remote query: $sql [".$dbh_remote->errstr."]";
}
$sth->execute;
#-- Parameterized query placeholders (question marks)
my $qms = '?,' x $sth->{NUM_OF_FIELDS};
chop($qms);
$snapshotname = ($snapshot->{'pbt_table'} eq '') ? $snapshot->{'snapshotname'} : $snapshot->{'pbt_table'};
$targetColumnList = join(',', getColumnList($sth));
if ($snapshot->{'pbt_table'} eq '') {
#--Truncates (empty) the local SNAPSHOT
$sql = <<SQL;
TRUNCATE $snapshot->{'schemaname'}.$snapshotname
SQL
$dbh_local->do($sql);
if (! $dbh_local->err) {
elog NOTICE, "Snapshot truncated" if $main::DEBUG==1;
} else {
elog ERROR, "Could not truncate snapshot: $sql [".$dbh_local->errstr."]";
}
} else {
setTriggerStatus($dbh_local, $snapshot->{'schemaname'}, $snapshotname, "${snapshotname}_pbt\$_trg", FALSE);
$targetColumnList = "pbt\$,$targetColumnList";
$qms = "$snapshot->{'snapid'},$qms";
#-- Deletes all rows where pbt$ eq $snapid
$sql = <<SQL;
DELETE FROM $snapshot->{'schemaname'}.$snapshotname
WHERE pbt\$ = ?
SQL
my $sth_aux = $dbh_local->prepare($sql);
$sth_aux->execute(($snapshot->{'snapid'}));
if (! $sth_aux->err) {
elog NOTICE, "Snapshot cleaned." if $main::DEBUG==1;
} else {
elog ERROR, "Could not clean snapshot: $sql [".$sth_aux->errstr."]";
}
}
$dbh_local->commit;
#-- Fill the SNAPSHOT with the remote database query results
#-- Prepare the INSERT query that will be executed for each returned query
$sql = <<SQL;
INSERT INTO $snapshot->{'schemaname'}.$snapshotname($targetColumnList)
VALUES ($qms)
SQL
elog NOTICE, $sql if $main::DEBUG==1;
$sth_local = $dbh_local->prepare($sql);
#-- Set the type of target row parameters to the same type of source row columns
my @types = getSthColumnTypes($sth);
for (my $count=0; $count < $sth->{NUM_OF_FIELDS}; $count++) {
$sth_local->bind_param($count + 1, undef, @types[$count]);
}
#-- Loop through all fetched records and insert them one at a time into the SNAPSHOT
$recs = $errors = 0;
while(my @row = $sth->fetchrow_array) {
$sth_local->execute(@row);
if ($sth_local->err) {
if ($errors eq 0) {
elog ERROR, $sth_local->errstr;
}
++$errors;
}
++$recs;
if (($recs % 1000) == 0) {
$dbh_local->commit;
elog NOTICE, "Commit. Record #$recs" if $main::DEBUG==1;
}
}
if ($sth->err) {
elog ERROR, $sth->errstr;
}
$dbh_local->commit;
$sth_local->finish;
$sth->finish;
elog NOTICE, "All $recs records processed. $errors errors." if $main::DEBUG==1;
if ($snapshot->{'pbt_table'} ne '') {
setTriggerStatus($dbh_local, $snapshot->{'schemaname'}, $snapshotname, "${snapshotname}_pbt\$_trg", TRUE);
}
#-- Returns the number of records inserted
return $recs-$errors;
}
#--
#-- SUB: performFastRefresh
#-- Performs a FAST REFRESH on a snapshot
#--
sub performFastRefresh {
#-- Function parameters
my ($dbh_local, $dbh_remote, $snapshot, $masterschema, $mastername, $lastRefresh) = @_;
#-- Local variables
my $sth_local;
my $sth_remote;
my $masterKeys = getSnapshotLogColumns($dbh_remote, $masterschema, $mastername);
my $qms;
my $recs;
my $errors;
my $sql;
my $where;
my @types;
my ($logKeys, $arrLogKeys) = getLogKeys ($masterKeys);
my @keys = ($masterKeys, $logKeys, @$arrLogKeys);
my $snapshotLogName = getSnapshotLogName($dbh_remote, $masterschema, $mastername);
my $snapshotname;
my $targetColumnList;
#-- Remove old records from the snapshot
my $deleted = deleteOldSnapshotRecords($dbh_local, $dbh_remote, $snapshot, $masterschema, $mastername, $lastRefresh, @keys);
my @arrMasterKeys = getSourceKeysFromQuery($snapshot->{'query'}, split(',', $masterKeys));
my $filter = '';
for (my $i = 0; $i < @arrMasterKeys; ++$i) {
$filter .= ' AND source.'.@arrMasterKeys[$i].'=keys.'.@$arrLogKeys[$i];
}
$filter = substr($filter, 5);
my $logFilter = getInsertedUpdatedLogRecordsFilter($dbh_remote, $lastRefresh);
my $source = $snapshot->{'query'};
$sql = <<SQL;
SELECT source.*
FROM ($source) source,
(SELECT $logKeys
FROM $masterschema.$snapshotLogName
WHERE $logFilter) keys
WHERE $filter
SQL
elog NOTICE, "Filter SQL is: $sql" if $main::DEBUG==1;
elog NOTICE, "LastRefresh is: $lastRefresh" if $main::DEBUG==1;
$sth_remote = $dbh_remote->prepare($sql);
if ($dbh_remote->err) {
elog ERROR, "Could not apply filter rules to original query: $sql [".$dbh_remote->errstr."]";
}
#-- Fill it with the result query
$sth_remote->execute();
if ($dbh_remote->err) {
elog ERROR, "Could not retrieve records to insert: $sql [".$dbh_remote->errstr."]";
}
$qms = '?,' x $sth_remote->{NUM_OF_FIELDS};
chop($qms);
if ($snapshot->{'pbt_table'} ne '') {
$qms = "$snapshot->{'snapid'},$qms";
}
$snapshotname = ($snapshot->{'pbt_table'} eq '') ? $snapshot->{'snapshotname'} : $snapshot->{'pbt_table'};
$targetColumnList = join(',', getColumnList($sth_remote));
if ($snapshot->{'pbt_table'} ne '') {
$targetColumnList = "pbt\$,$targetColumnList";
}
$sql = <<SQL;
INSERT INTO $snapshot->{'schemaname'}.$snapshotname($targetColumnList)
VALUES ($qms)
SQL
$sth_local = $dbh_local->prepare($sql);
if ($dbh_local->err) {
elog ERROR, "Could not add new records into snapshot: $sql [".$dbh_local->errstr."]";
}
@types = getSthColumnTypes($sth_remote);
for (my $count=0; $count < $sth_remote->{NUM_OF_FIELDS}; $count++) {
$sth_local->bind_param($count + 1, undef, @types[$count]);
}
elog NOTICE, 'Fetching records.' if $main::DEBUG==1;
#-- Loop through all fetched records and insert them one at a time into the SNAPSHOT
$recs = 0;
$errors = 0;
setTriggerStatus($dbh_local, $snapshot->{'schemaname'}, $snapshotname, "${snapshotname}_pbt\$_trg", FALSE);
while(my @row = $sth_remote->fetchrow_array) {
$sth_local->execute(@row);
if ($sth_local->err) {
++$errors;
}
++$recs;
if (($recs % 1000) == 0) {
$dbh_local->commit;
elog NOTICE, "Inserted. Record #$recs" if $main::DEBUG==1;
}
}
setTriggerStatus($dbh_local, $snapshot->{'schemaname'}, $snapshotname, "${snapshotname}_pbt\$_trg", TRUE);
if (! $sth_remote->err) {
$recs = $recs - $errors;
elog NOTICE, "Inserted $recs modified records" if $main::DEBUG==1;
if ($errors > 0) {
elog ERROR, "$errors errors on insertion process" if $main::DEBUG==1;
}
} else {
elog ERROR, "Could insert modified snapshot records: $sql [".$sth_remote->errstr."]";
}
$dbh_local->commit;
$sth_local->finish;
return $recs;
}
#--
#-- SUB: finalizeRefresh
#-- Finalize the refresh process updating control fields.
#--
sub finalizeRefresh {
#-- Function parameters
my ($dbh, $masterschema, $mastername, $snapid, $refreshTime) = @_;
#-- Local variables
if ($masterschema ne undef) {
if (snapshotLogExists($dbh, $masterschema, $mastername)) { #-- has snapshot log
elog NOTICE, "Setting refresh time to $refreshTime" if $main::DEBUG==1;
updateLastSnapshotLogRefresh($dbh, $masterschema, $mastername, $refreshTime, $snapid);
updateNullSnaptimeRows($dbh, $masterschema, $mastername, $refreshTime);
}
}
}
#--
#-- SUB: purgeSnapshotLog
#-- Removes old SNAPSHOT LOG entries
#--
sub purgeSnapshotLog {
#-- Function parameters
my ($dbh, $masterschema, $mastername) = @_;
call_driver_function($dbh, 'snapshot_do', 'PURGELOG', $masterschema, $mastername, undef);
}
#--
#-- SUB: call_driver_function
#-- Calls a driver function (native) on the database pointed by the database handle ($dbh)
#--
sub call_driver_function {
my ($dbh, $function_name, $operation, $masterschema, $mastername, $additional) = @_;
$masterschema = uc($masterschema);
$mastername = uc($mastername);
elog NOTICE, "Calling DRIVER function $function_name ($operation, $masterschema, $mastername, $additional)" if $main::DEBUG==1;
if ($function_name eq 'snapshot_do') {
dbi_execute_stored_procedure($dbh, 'snapshot_do', ('123456', $operation, $masterschema, $mastername, $additional));
} elsif ($function_name eq 'snapshotlog_exists') {
return dbi_call_function($dbh, 'snapshotlog_exists', ($masterschema, $mastername));
} elsif ($function_name eq 'last_log_refresh') {
return dbi_call_function($dbh, 'last_log_refresh' , ($masterschema, $mastername, $additional));
} elsif ($function_name eq 'snapshotlog_columns') {
return dbi_call_function($dbh, 'snapshotlog_columns', ($masterschema, $mastername));
} elsif ($function_name eq 'snapshotlog_name') {
return dbi_call_function($dbh, 'snapshotlog_name', ($masterschema, $mastername));
} elsif ($function_name eq 'count_log_modified_rows') {
return dbi_call_function($dbh, 'count_log_modified_rows', ($masterschema, $mastername, $additional));
} elsif ($function_name eq 'snapshotlog_ud_filter') {
return dbi_call_function($dbh, 'snapshotlog_ud_filter', ($additional));
} elsif ($function_name eq 'snapshotlog_iu_filter') {
return dbi_call_function($dbh, 'snapshotlog_iu_filter', ($additional));
} else {
elog ERROR, "Could not call DRIVER function: $function_name";
}
$dbh->commit;
}
sub dbi_execute_stored_procedure {
use DBI::Const::GetInfoType;
my ($dbh, $procname, @parameters) = @_;
my $dbname = lc($dbh->get_info( $GetInfoType{SQL_DBMS_NAME} ));
my $sql;
my $sth;
my $qmarks = '?,' x @parameters;
chop $qmarks;
if ($dbname eq 'oracle') {
$sql = "BEGIN $procname($qmarks); END;";
} elsif ($dbname eq 'postgresql') {
$sql = "SELECT $procname($qmarks)";
} else {
$sql = "SELECT $procname($qmarks)";
}
$sth = $dbh->prepare($sql);
if ($dbh->err) {
elog ERROR, $dbh->errstr."' SQL='$sql'";
}
$sth->execute(@parameters);
if ($sth->err) {
elog ERROR, $sth->errstr."' SQL='$sql'";
}
}
sub dbi_call_function {
use DBI::Const::GetInfoType;
my ($dbh, $funcname, @parameters) = @_;
my $dbname = lc($dbh->get_info( $GetInfoType{SQL_DBMS_NAME} ));
my $sql;
my $sth;
my @row;
my $qmarks = '?,' x @parameters;
chop $qmarks;
if ($dbname eq 'oracle') {
$sql = "SELECT $funcname($qmarks) FROM DUAL";
} elsif ($dbname eq 'postgresql') {
$sql = "SELECT $funcname($qmarks)";
} else {
$sql = "SELECT $funcname($qmarks)";
}
$sth = $dbh->prepare($sql);
if ($dbh->err) {
elog ERROR, $dbh->errstr."' SQL='$sql'";
}
$sth->execute(@parameters);
if ($sth->err) {
elog ERROR, $sth->errstr."' SQL='$sql'";
}
@row = $sth->fetchrow_array;
my $result = @row[0];
return $result;
}
#--
#-- SUB: setTriggerStatus
#-- Enable/Disable a trigger
#--
sub setTriggerStatus {
#-- Function parameters
my ($dbh, $schemaname, $tablename, $triggername, $status) = @_;
#-- Local variables
my $sql;
my $sth;
my $tgenabled = ($status == TRUE) ? 'TRUE' : 'FALSE';
$sql = <<SQL;
SELECT c.oid
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname=?
AND c.relname=?
SQL
my ($tableid) = sqlLookup($dbh, $sql, ($schemaname, $tablename));
if ("$tableid" eq "") {
elog ERROR, "Could not retrieve TABLEID for table $schemaname.$tablename";
}
elog NOTICE, "Disabling trigger for table ID=$tableid NAME=$triggername" if $main::DEBUG==1;
$sql = <<SQL;
UPDATE pg_trigger
SET tgenabled = $tgenabled
WHERE tgrelid=?
AND tgname=?
SQL
elog NOTICE, $sql if $main::DEBUG==1;
$sth = $dbh->prepare($sql);
$sth->execute(($tableid, $triggername));
#-- Dummy update to refresh new trigger status
if ($sth->err) {
elog ERROR, $sth->errstr;
}
$sql = <<SQL;
UPDATE pg_class
SET relname=relname
WHERE oid=?
SQL
$sth = $dbh->prepare($sql);
$sth->execute(($tableid));
if ($sth->err) {
elog ERROR, $sth->errstr;
}
}
#--
#-- SUB: getColumnList
#-- Returns an array with all columns on a statement
#--
sub getColumnList {
my ($sth) = @_;
my @cols = ();
for ( my $i = 0 ; $i < $sth->{NUM_OF_FIELDS} ; $i++ ) {
push @cols, $sth->{NAME}->[$i];
}
return @cols;
}
#--
#-- getSourceKeysFromQuery
#--
sub getSourceKeysFromQuery {
my ($query, @masterKeys) = @_;
my @keys=();
my @related = $query =~ m/ +([0-9a-z_\$]+|".+") +as +([0-9a-z_\$]+|".+")/gi;
my $found;
for (my $i = 0; $i < @masterKeys; ++$i) {
$found = FALSE;
for (my $j = 0; $j < @related; $j += 2) {
if (@masterKeys[$i] eq @related[$j]) {
push(@keys, @related[$j+1]);
$found = TRUE;
last;
}
}
if (! $found) {
push(@keys, @masterKeys[$i]);
}
}
return @keys;
}
#--
#-- SUB: retrieveMasterForSnapshot
#-- Retrieve the MASTER tables for a SNAPSHOT
#--
sub retrieveMasterForSnapshot {
#-- Function parameters
my ($dbh, $snapshot) = @_;
#-- Local variables
my $masterschema;
my $mastername;
my $sql;
my $sth;
my $relatedCount;
my $tablename;
my @related=();
my @tables=();
$tablename = $snapshot->{'query'};
@related = $tablename =~ m/(from|join) +((([0-9a-z_\$]+|".+")\.)?([0-9a-z_\$]+|".+")( *, *(([0-9a-z_\$]+|".+")\.)?([0-9a-z_\$]+|".+"))*)/gi;
for (my $i = 1; $i < @related; $i += 9) {
push(@tables, @related[$i]);
}
if (@tables eq 0) {
elog ERROR, 'Origin TABLES not found !';
}
if (@tables eq 1) {
my $tablename = @tables[0];
elog NOTICE, "TABLE=[$tablename]" if $main::DEBUG==1;
($masterschema, $mastername) = split(/\./, $tablename);
elog NOTICE, "MASTERSCHEMA=[$masterschema] MASTERNAME=[$mastername]" if $main::DEBUG==1;
if ($mastername eq undef) {
#-- No Schema
elog NOTICE, 'Sorry, due to the complexity of your query, I could not guess the Schema name of your tables, so I will try a COMPLETE REFRESH instead.';
$mastername = $masterschema;
$snapshot->{'kind'} = 'C';
$masterschema = undef;
}
return ($masterschema, $mastername);
}
if (($snapshot->{'kind'} eq 'F') || ($snapshot->{'kind'} eq 'R')) {
elog NOTICE, "Refresh FAST only work with one-table only queries. Falling back to COMPLETE REFRESH. TABLES=[" . join(',', @tables) . ']';
$snapshot->{'kind'} = 'C';
}
return (undef, undef);
}
#--
#-- SUB: snapshotLogExists
#-- Returns whether a master object has a SNAPSHOT LOG
#--
sub snapshotLogExists {
#-- Function parameters
my ($dbh, $masterschema, $mastername) = @_;
my $result = call_driver_function($dbh, 'snapshotlog_exists', undef, $masterschema, $mastername, undef);
return ($result eq 'T');
}
#--
#-- SUB: getLastSnapshotLogRefresh
#-- Returns the date/time of the snapshot's last refresh
#--
sub getLastSnapshotLogRefresh {
#-- Function arguments
my ($dbh, $masterschema, $mastername, $snapid) = @_;
return call_driver_function($dbh, 'last_log_refresh', undef, $masterschema, $mastername, $snapid);
}
#--
#-- SUB: getRefreshMethod
#-- Returns the best refresh method supported by the SNAPSHOT refreshing process
#--
sub getRefreshMethod {
#-- Function parameters
my ($dbh, $snapshot, $masterschema, $mastername, $lastRefresh) = @_;
#-- Local variables
my $sql;
my $sth;
my $kind = $snapshot->{'kind'};
my $row;
if (($lastRefresh eq undef) || ($lastRefresh eq '1900-01-01 00:00:00')) { #-- It's the snapshot first refresh: always COMPLETE
elog NOTICE, 'First Refresh detected.' if $main::DEBUG==1;
$kind = 'C';
} elsif (($kind eq 'R') or ($kind eq 'F')) { #-- The snapshot was created with REFRESH FORCE or REFRESH FAST
#-- Test if LOG exists
if (snapshotLogExists($dbh, $masterschema, $mastername)) {
#-- count the rows at LOG
my $logcount = countSnapshotLogModifiedRows($dbh, $masterschema, $mastername, $lastRefresh);
#-- count the rows at MASTER
if ("$snapshot->{'pbt_table'}" ne '') {
$sql = <<SQL;
SELECT count(*) as total
FROM $masterschema.$snapshot->{'pbt_table'}
WHERE pbt\$ = $snapshot->{'snapid'}
SQL
} else {
$sql = <<SQL;
SELECT count(*) as total
FROM $masterschema.$mastername
SQL
}
my ($mastercount) = sqlLookup($dbh, $sql, ());
elog NOTICE, "Number of modified records: LOG=$logcount MASTER=$mastercount" if $main::DEBUG==1;
#-- FAST only if count(LOG) le count(MASTER) / 4
if (($kind eq 'F') || ($logcount le $mastercount)) {
$kind = 'F';
} else {
elog NOTICE, "Huge number of modified records!" if $main::DEBUG==1;
$kind = 'C';
}
} else {
elog NOTICE, 'Snapshot Log does not exist.' if $main::DEBUG==1;
$kind = 'C';
}
if (($snapshot->{'kind'} eq 'F') and ($kind ne 'F')) {
#-- Can't do REFRESH FAST
elog ERROR, 'Refresh FAST not supported. Did you set up a SNAPSHOT LOG correctly ?';
}
}
return $kind;
}
#--
#-- SUB: updateLastSnapshotLogRefresh
#-- Updates the SNAPSHOT LOG catalog with the SNAPSHOT last refresh
#--
sub updateLastSnapshotLogRefresh {
#-- Function parameters
my ($dbh, $masterschema, $mastername, $refreshTime, $snapid) = @_;
call_driver_function($dbh, 'snapshot_do', 'REFRESHED', $masterschema, $mastername, $refreshTime.'|'.$snapid);
}
#--
#-- SUB: updateNullSnaptimeRows
#-- Updates the SNAPSHOT LOG rows that were not viewed by anyone (snaptime column is NULL)
#--
sub updateNullSnaptimeRows {
#-- Function parameters
my ($dbh, $masterschema, $mastername, $refreshTime) = @_;
call_driver_function($dbh, 'snapshot_do', 'UPDATE_NULL', $masterschema, $mastername, $refreshTime);
}
#--
#-- SUB: getInsertedUpdatedLogRecordsFilter
#-- Returns the filter for retrieving snapshot log's inserted or updated records
#--
sub getInsertedUpdatedLogRecordsFilter {
my ($dbh, $lastRefresh) = @_;
return call_driver_function($dbh, 'snapshotlog_iu_filter', undef, undef, undef, $lastRefresh);
}
#--
#-- SUB: countSnapshotLogModifiedRows
#-- Returns the number of modified rows on the snapshot log
#--
sub countSnapshotLogModifiedRows {
#-- Function parameters
my ($dbh, $masterschema, $mastername, $lastRefresh) = @_;
return call_driver_function($dbh, 'count_log_modified_rows', undef, $masterschema, $mastername, $lastRefresh);
}
#--
#-- SUB: getSthColumnTypes
#-- Returns an array in which every index is the corresponding SQL data type for each column index of the STATEMENT parameter
#--
sub getSthColumnTypes {
#-- Function parameters
my ($sth) = @_;
#-- Local variables
my @types;
for (my $i = 0; $i < $sth->{NUM_OF_FIELDS}; ++$i) {
my $dbiType = $sth->{TYPE}->[$i];
@types[$i] = map2PgType(getUnifiedSqlType($dbiType));
}
return @types;
}
#--
#-- SUB: map2PgType
#-- Maps a DBI type to a PostgreSQL native type or a SQL type supported by PostgreSQL
#--
sub map2PgType {
use DBD::Pg;
my ($dbiType) = @_;
my $pgType;
#--------------------------------
#-- SQL datatype codes
#-- SQL_GUID (-11)
#-- SQL_WLONGVARCHAR (-10)
#-- SQL_WVARCHAR (-9)
#-- SQL_WCHAR (-8)
#-- SQL_BIT (-7)
#-- SQL_TINYINT (-6)
#-- SQL_BIGINT (-5)
#-- SQL_LONGVARBINARY (-4)
#-- SQL_VARBINARY (-3)
#-- SQL_BINARY (-2)
#-- SQL_LONGVARCHAR (-1)
#-- SQL_UNKNOWN_TYPE 0
#-- SQL_ALL_TYPES 0
#-- SQL_CHAR 1
#-- SQL_NUMERIC 2
#-- SQL_DECIMAL 3
#-- SQL_INTEGER 4
#-- SQL_SMALLINT 5
#-- SQL_FLOAT 6
#-- SQL_REAL 7
#-- SQL_DOUBLE 8
#-- SQL_DATETIME 9
#-- SQL_DATE 9
#-- SQL_INTERVAL 10
#-- SQL_TIME 10
#-- SQL_TIMESTAMP 11
#-- SQL_VARCHAR 12
#-- SQL_BOOLEAN 16
#-- SQL_UDT 17
#-- SQL_UDT_LOCATOR 18
#-- SQL_ROW 19
#-- SQL_REF 20
#-- SQL_BLOB 30
#-- SQL_BLOB_LOCATOR 31
#-- SQL_CLOB 40
#-- SQL_CLOB_LOCATOR 41
#-- SQL_ARRAY 50
#-- SQL_ARRAY_LOCATOR 51
#-- SQL_MULTISET 55
#-- SQL_MULTISET_LOCATOR 56
#-- SQL_TYPE_DATE 91
#-- SQL_TYPE_TIME 92
#-- SQL_TYPE_TIMESTAMP 93
#-- SQL_TYPE_TIME_WITH_TIMEZONE 94
#-- SQL_TYPE_TIMESTAMP_WITH_TIMEZONE 95
#-- SQL_INTERVAL_YEAR 101
#-- SQL_INTERVAL_MONTH 102
#-- SQL_INTERVAL_DAY 103
#-- SQL_INTERVAL_HOUR 104
#-- SQL_INTERVAL_MINUTE 105
#-- SQL_INTERVAL_SECOND 106
#-- SQL_INTERVAL_YEAR_TO_MONTH 107
#-- SQL_INTERVAL_DAY_TO_HOUR 108
#-- SQL_INTERVAL_DAY_TO_MINUTE 109
#-- SQL_INTERVAL_DAY_TO_SECOND 110
#-- SQL_INTERVAL_HOUR_TO_MINUTE 111
#-- SQL_INTERVAL_HOUR_TO_SECOND 112
#-- SQL_INTERVAL_MINUTE_TO_SECOND 113
#--------------------------------
#--PG_BOOL PG_BYTEA PG_CHAR PG_INT8 PG_INT2 PG_INT4 PG_TEXT PG_OID
#--PG_FLOAT4 PG_FLOAT8 PG_ABSTIME PG_RELTIME PG_TINTERVAL PG_BPCHAR
#--PG_VARCHAR PG_DATE PG_TIME PG_DATETIME PG_TIMESPAN PG_TIMESTAMP
if ( $dbiType == DBI::SQL_INTEGER ) {
$pgType = { pg_type => DBD::Pg::PG_INT8 };
} elsif ( $dbiType == DBI::SQL_SMALLINT ) {
$pgType = { pg_type => DBD::Pg::PG_INT4 };
} elsif ( $dbiType == DBI::SQL_TIMESTAMP ) {
$pgType = DBI::SQL_TIMESTAMP;
} elsif ( $dbiType == DBI::SQL_NUMERIC ) {
$pgType = DBI::SQL_NUMERIC;
} elsif ( $dbiType == DBI::SQL_BOOLEAN ) {
$pgType = DBI::SQL_BOOLEAN;
} elsif ( $dbiType == DBI::SQL_CHAR ) {
$pgType = DBI::SQL_CHAR;
} elsif ( $dbiType == DBI::SQL_VARCHAR ) {
$pgType = { pg_type => DBD::Pg::PG_VARCHAR };
} elsif ( $dbiType == DBI::SQL_CLOB ) {
$pgType = { pg_type => DBD::Pg::PG_VARCHAR };
} elsif ( $dbiType == DBI::SQL_DATE ) {
$pgType = DBI::SQL_DATE;
} elsif ( $dbiType == DBI::SQL_TIME ) {
$pgType = DBI::SQL_TIME;
} elsif ( $dbiType == DBI::SQL_INTERVAL ) {
$pgType = DBI::SQL_INTERVAL;
} elsif ( $dbiType == DBI::SQL_BLOB ) {
$pgType = { pg_type => DBD::Pg::PG_BYTEA };
} else {
$pgType = { pg_type => DBD::Pg::PG_TEXT };
}
return $pgType;
}
#--
#-- SUB: getSnapshotLogColumns
#-- Returns the snapshot log's filter/pk/oid columns
#--
sub getSnapshotLogColumns {
my ($dbh, $masterschema, $mastername) = @_;
my $result = call_driver_function($dbh, 'snapshotlog_columns', undef, $masterschema, $mastername, undef);
return $result;
}
#--
#-- SUB: getLogKeys
#-- Maps the MASTER keys to SNAPSHOT LOG keys
#--
sub getLogKeys {
#-- Function parameters
my ($masterKeys) = @_;
#-- Local variables
my @arrLogKeys = ();
my $logKeys;
my @keys = split(/,/, $masterKeys);
for (my $i = 0; $i < @keys; ++$i) {
my $col = @keys[$i];
if ($col eq 'oid') {
push (@arrLogKeys, 'm_row$$');
} else {
push (@arrLogKeys, $col);
}
}
$logKeys = join(',', @arrLogKeys);
return ($logKeys, \@arrLogKeys);
}
#--
#-- SUB: getSnapshotLogName
#-- Returns the snapshot log's table name
#--
sub getSnapshotLogName {
my ($dbh, $masterschema, $mastername) = @_;
return call_driver_function($dbh, 'snapshotlog_name', undef, $masterschema, $mastername, undef);
}
#--
#-- SUB: deleteOldSnapshotRecords
#-- Removes old records from the local snapshot based on the snapshot log
#--
sub deleteOldSnapshotRecords {
#-- Function parameters
my ($dbh_local, $dbh_remote, $snapshot, $masterschema, $mastername, $lastRefresh, @keys) = @_;
#-- Local variables
my $sql;
my $filter;
my $sth_local;
my $sth_remote;
my $qms;
my $recs;
my $sqlRow;
my ($masterKeys, $logKeys, @arrLogKeys) = @keys;
my $snapshotLogName = getSnapshotLogName($dbh_remote, $masterschema, $mastername);
my $snapshotname;
#-- Delete modified/removed records from SNAPSHOT
$filter = getUpdatedDeletedLogRecordsFilter($dbh_remote, $lastRefresh);
$sql = <<SQL;
SELECT $logKeys
FROM $masterschema.$snapshotLogName
WHERE $filter
SQL
$sth_remote = $dbh_remote->prepare($sql);
if ($dbh_remote->err) {
elog ERROR, "Could not retrieve records to delete: $sql [".$dbh_remote->errstr."]";
}
$sth_remote->execute();
if ($sth_remote->err) {
elog ERROR, "Could not retrieve records to delete: $sql [".$sth_remote->errstr."]";
}
my @master = getSourceKeysFromQuery($snapshot->{'query'}, split(',', $masterKeys));
$sqlRow = '';
for (my $i=0; $i < @master; ++$i) {
$sqlRow .= " AND " . @master->[$i] . '=?';
}
$sqlRow = substr($sqlRow, 5);
$snapshotname = ($snapshot->{'pbt_table'} eq '') ? $snapshot->{'snapshotname'} : $snapshot->{'pbt_table'};
$sql = <<SQL;
DELETE FROM $snapshot->{'schemaname'}.$snapshotname
WHERE $sqlRow
SQL
if ($snapshot->{'pbt_table'} ne '') {
$sql .= " AND pbt\$ = $snapshot->{'snapid'}";
}
$sth_local = $dbh_local->prepare($sql);
if ($dbh_local->err) {
elog ERROR, "Could not erase old records from snapshot: $sql [".$dbh_local->errstr."]";
}
$recs = 0;
setTriggerStatus($dbh_local, $snapshot->{'schemaname'}, $snapshotname, "${snapshotname}_pbt\$_trg", FALSE);
while(my @row = $sth_remote->fetchrow_array) {
$sth_local->execute(@row);
if ($sth_local->err) {
elog ERROR, $sth_local->errstr;
}
++$recs;
if (($recs % 1000) == 0) {
$dbh_local->commit;
elog NOTICE, "Deleted. Record #$recs" if $main::DEBUG==1;
}
}
setTriggerStatus($dbh_local, $snapshot->{'schemaname'}, $snapshotname, "${snapshotname}_pbt\$_trg", TRUE);
if (! $sth_remote->err) {
elog NOTICE, "Deleted $recs modified records" if $main::DEBUG==1;
} else {
elog ERROR, "Could delete modified snapshot records: $sql [".$sth_remote->errstr."]";
}
$dbh_local->commit;
$sth_local->finish;
return $recs;
}
#--
#-- SUB: getSnapshotLogName
#-- Returns the snapshot log's table name
#--
sub getSnapshotLogName {
my ($dbh, $masterschema, $mastername) = @_;
return call_driver_function($dbh, 'snapshotlog_name', undef, $masterschema, $mastername, undef);
}
#--
#-- SUB: getUnifiedSqlType
#-- Maps a set of SQL types into a single SQL type
#--
sub getUnifiedSqlType {
my ($dbiType) = @_;
#--------------------------------
#-- SQL datatype codes
#-- SQL_GUID (-11)
#-- SQL_WLONGVARCHAR (-10)
#-- SQL_WVARCHAR (-9)
#-- SQL_WCHAR (-8)
#-- SQL_BIT (-7)
#-- SQL_TINYINT (-6)
#-- SQL_BIGINT (-5)
#-- SQL_LONGVARBINARY (-4)
#-- SQL_VARBINARY (-3)
#-- SQL_BINARY (-2)
#-- SQL_LONGVARCHAR (-1)
#-- SQL_UNKNOWN_TYPE 0
#-- SQL_ALL_TYPES 0
#-- SQL_CHAR 1
#-- SQL_NUMERIC 2
#-- SQL_DECIMAL 3
#-- SQL_INTEGER 4
#-- SQL_SMALLINT 5
#-- SQL_FLOAT 6
#-- SQL_REAL 7
#-- SQL_DOUBLE 8
#-- SQL_DATETIME 9
#-- SQL_DATE 9
#-- SQL_INTERVAL 10
#-- SQL_TIME 10
#-- SQL_TIMESTAMP 11
#-- SQL_VARCHAR 12
#-- SQL_BOOLEAN 16
#-- SQL_UDT 17
#-- SQL_UDT_LOCATOR 18
#-- SQL_ROW 19
#-- SQL_REF 20
#-- SQL_BLOB 30
#-- SQL_BLOB_LOCATOR 31
#-- SQL_CLOB 40
#-- SQL_CLOB_LOCATOR 41
#-- SQL_ARRAY 50
#-- SQL_ARRAY_LOCATOR 51
#-- SQL_MULTISET 55
#-- SQL_MULTISET_LOCATOR 56
#-- SQL_TYPE_DATE 91
#-- SQL_TYPE_TIME 92
#-- SQL_TYPE_TIMESTAMP 93
#-- SQL_TYPE_TIME_WITH_TIMEZONE 94
#-- SQL_TYPE_TIMESTAMP_WITH_TIMEZONE 95
#-- SQL_INTERVAL_YEAR 101
#-- SQL_INTERVAL_MONTH 102
#-- SQL_INTERVAL_DAY 103
#-- SQL_INTERVAL_HOUR 104
#-- SQL_INTERVAL_MINUTE 105
#-- SQL_INTERVAL_SECOND 106
#-- SQL_INTERVAL_YEAR_TO_MONTH 107
#-- SQL_INTERVAL_DAY_TO_HOUR 108
#-- SQL_INTERVAL_DAY_TO_MINUTE 109
#-- SQL_INTERVAL_DAY_TO_SECOND 110
#-- SQL_INTERVAL_HOUR_TO_MINUTE 111
#-- SQL_INTERVAL_HOUR_TO_SECOND 112
#-- SQL_INTERVAL_MINUTE_TO_SECOND 113
#--------------------------------
#--PG_BOOL PG_BYTEA PG_CHAR PG_INT8 PG_INT2 PG_INT4 PG_TEXT PG_OID
#--PG_FLOAT4 PG_FLOAT8 PG_ABSTIME PG_RELTIME PG_TINTERVAL PG_BPCHAR
#--PG_VARCHAR PG_DATE PG_TIME PG_DATETIME PG_TIMESPAN PG_TIMESTAMP
if (($dbiType == DBI::SQL_INTEGER)) {
return DBI::SQL_INTEGER;
} elsif (($dbiType == DBI::SQL_SMALLINT)) {
return DBI::SQL_SMALLINT;
} elsif (($dbiType == DBI::SQL_TYPE_TIME_WITH_TIMEZONE)
|| ($dbiType == DBI::SQL_TYPE_TIMESTAMP_WITH_TIMEZONE)
|| ( $dbiType == DBI::SQL_DATETIME )
|| ( $dbiType == DBI::SQL_TIMESTAMP )
|| ( $dbiType == DBI::SQL_TYPE_TIMESTAMP ) ) { #--TIMESTAMP
return DBI::SQL_TIMESTAMP;
} elsif (($dbiType == DBI::SQL_NUMERIC )
|| ($dbiType == DBI::SQL_DECIMAL )
|| ($dbiType == DBI::SQL_FLOAT )
|| ($dbiType == DBI::SQL_REAL )
|| ($dbiType == DBI::SQL_DOUBLE )
#-- || ($dbiType == DBI::SQL_BIGINT )
|| ($dbiType == DBI::SQL_TINYINT )) { #--NUMERIC
return DBI::SQL_NUMERIC;
} elsif ( ( $dbiType == DBI::SQL_BOOLEAN )
|| ( $dbiType == DBI::SQL_BIT ) ) { #--BOOLEAN
return DBI::SQL_BOOLEAN;
} elsif ( ($dbiType == DBI::SQL_CHAR )
|| ( $dbiType == DBI::SQL_WCHAR ) ) { #--CHAR
return DBI::SQL_CHAR;
} elsif ( ($dbiType == DBI::SQL_VARCHAR )
|| ( $dbiType == DBI::SQL_WVARCHAR )) { #--VARCHAR
return DBI::SQL_VARCHAR;
} elsif ( ( $dbiType == DBI::SQL_LONGVARCHAR )
|| ( $dbiType == DBI::SQL_WLONGVARCHAR )
|| ( $dbiType == DBI::SQL_CLOB ) ) { #--CLOB
return DBI::SQL_CLOB;
} elsif ( ( $dbiType == DBI::SQL_DATE )
|| ( $dbiType == DBI::SQL_TYPE_DATE ) ) { #--DATE
return DBI::SQL_DATE;
} elsif ( ( $dbiType == DBI::SQL_TIME )
|| ( $dbiType == DBI::SQL_TYPE_TIME ) ) { #--TIME
return DBI::SQL_TIME;
} elsif ( ( $dbiType == DBI::SQL_INTERVAL )
|| ( $dbiType == DBI::SQL_INTERVAL_YEAR )
|| ( $dbiType == DBI::SQL_INTERVAL_MONTH )
|| ( $dbiType == DBI::SQL_INTERVAL_DAY )
|| ( $dbiType == DBI::SQL_INTERVAL_HOUR )
|| ( $dbiType == DBI::SQL_INTERVAL_MINUTE )
|| ( $dbiType == DBI::SQL_INTERVAL_SECOND )
|| ( $dbiType == DBI::SQL_INTERVAL_YEAR_TO_MONTH )
|| ( $dbiType == DBI::SQL_INTERVAL_DAY_TO_HOUR )
|| ( $dbiType == DBI::SQL_INTERVAL_DAY_TO_MINUTE )
|| ( $dbiType == DBI::SQL_INTERVAL_DAY_TO_SECOND )
|| ( $dbiType == DBI::SQL_INTERVAL_HOUR_TO_MINUTE )
|| ( $dbiType == DBI::SQL_INTERVAL_HOUR_TO_SECOND )
|| ( $dbiType == DBI::SQL_INTERVAL_MINUTE_TO_SECOND ) ) { #--INTERVAL
return DBI::SQL_INTERVAL;
} elsif ( ( $dbiType == DBI::SQL_BINARY )
|| ( $dbiType == DBI::SQL_VARBINARY )
|| ( $dbiType == DBI::SQL_LONGVARBINARY )
|| ( $dbiType == DBI::SQL_BLOB ) ) { #--BLOB
return $dbiType == DBI::SQL_BLOB;
} else {
return DBI::SQL_VARCHAR; #-- default: VARCHAR
}
}
#--
#-- SUB: getUpdatedDeletedLogRecordsFilter
#-- Returns the filter for retrieving snapshot log's updated or deleted records
#--
sub getUpdatedDeletedLogRecordsFilter {
my ($dbh, $lastRefresh) = @_;
return call_driver_function($dbh, 'snapshotlog_ud_filter', undef, undef, undef, $lastRefresh);
}
$BODY$
LANGUAGE 'plperlu' VOLATILE;
ALTER FUNCTION refresh_snapshot(schemaname text, snapshotname text) OWNER TO postgres;
COMMENT ON FUNCTION refresh_snapshot(schemaname text, snapshotname text) IS $$
This function is part of PostgreSQL::Snapshots project.
This is the function that refreshes the snapshot. The refreshing process may take a long time depending on the size of the result, the connection speed, etc. This is not a memory consuming process once all the process is divided in transaction chunks of 1000 records. It uses more CPU (because of Perl) and network (because of the resultset size) than memory.
$$;
--
-- ATTENTION: This is a free software.
-- View the LICENSE.txt file for license information
--
------------------------------------------------------------------------------
-- FUNCTION: public.create_snapshot_log
--
-- Creates a SNAPSHOT LOG on a TABLE to be used on FAST refreshes
------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION public.create_snapshot_log(schemaname text, mastername text, withwhat text)
RETURNS bool AS
$BODY$
use strict;
use DBI;
use constant TRUE => 1;
use constant FALSE => "";
#-- Function parameters
my ($schemaname, $mastername, $withwhat) = @_;
#-- Set this to 1 for debugging messages
$main::DEBUG=0;
#-- Localhost superuser connection
my $dbh_local = DBI->connect('dbi:Pg:dbname=labpeTeste', 'postgres', undef, {AutoCommit => 0});
#-- variables
my $sql;
my $rs;
my $row;
my $sth;
my $masterKeyColumns;
my $logKeyColumns;
my $masterPkColumns;
my $masterFilterColumns;
my $flag;
my $log;
#-- Test if the SNAPSHOT LOG already exists
if (snapshotLogExists($dbh_local, $schemaname, $mastername)) {
elog ERROR, "Snapshot log on '$schemaname.$mastername' already exists! SQL=$sql";
}
if (! objectExists($dbh_local, $schemaname, $mastername)) {
elog ERROR, "Master '$schemaname.$mastername' does not exist! SQL=$sql";
}
($flag, $masterKeyColumns, $logKeyColumns, $masterPkColumns, $masterFilterColumns) = getKeyColumns($dbh_local, $schemaname, $mastername, $withwhat);
elog NOTICE, 'KEY='.join(',', keys %$logKeyColumns) . ' TYPES='.join(',', values %$logKeyColumns) . ' FLAG='.$flag if $main::DEBUG == 1;
#-- Create the Snapshot's Log Table
$log = createSnapshotLogTable($schemaname, $mastername, $logKeyColumns);
createSnapshotLogTableIndexes($schemaname, $mastername, $log, $logKeyColumns);
createSnapshotLogTrigger($dbh_local, $schemaname, $mastername, $log, $masterKeyColumns, $logKeyColumns);
createSnapshotLogEntry($dbh_local, $schemaname, $mastername, $flag, $log, $masterPkColumns, $masterFilterColumns);
#--call_driver_function($dbh_local, 'snapshot_do', 'REGISTER', $schemaname, $mastername, $snapid);
$dbh_local->commit;
$dbh_local->disconnect;
#-- All done. Let's return TRUE
return TRUE;
#--
#-- SUB: sqlLookup
#-- Returns a one row result from a SQL query
#--
sub sqlLookup {
my ($dbh, $sql, @params) = @_;
return sqlLookup2 ($dbh, $sql, \@params, TRUE);
}
sub sqlLookupSoft {
my ($dbh, $sql, @params) = @_;
return sqlLookup2 ($dbh, $sql, \@params, FALSE);
}
sub sqlLookup2 {
my ($dbh, $sql, $params, $errorsAreFatal) = @_;
my $sth;
my @row;
elog NOTICE, "Lookup at SQL:$sql with ".((@$params ne 0) ? join(',', @$params) : '<none>') if $main::DEBUG==1;
$sth = $dbh->prepare($sql);
if (@$params ne 0) {
$sth->execute(@$params);
} else {
$sth->execute();
}
if ($sth->err) {
if ($errorsAreFatal) {
elog ERROR, "Fatal error! SQL=$sql ERROR=".$sth->errstr;
}
}
my @row = $sth->fetchrow_array;
$sth->finish;
return @row;
}
#--
#-- SUB: snapshotLogExists
#-- Returns whether a master object has a SNAPSHOT LOG
#--
sub snapshotLogExists {
#-- Function parameters
my ($dbh, $masterschema, $mastername) = @_;
my $result = call_driver_function($dbh, 'snapshotlog_exists', undef, $masterschema, $mastername, undef);
return ($result eq 'T');
}
#--
#-- SUB: objectExists
#-- Returns whether an object exists or not
#--
sub objectExists {
my ($dbh, $schemaname, $snapshotname) = @_;
# local variables
my $sql;
my $sth;
my $row;
$sql = <<SQL;
SELECT count(*) as total
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname=? AND c.relname = ?
SQL
$sth = $dbh->prepare($sql);
$sth->execute(($schemaname, $snapshotname));
$row = $sth->fetchrow_hashref;
return ($row->{total} ne 0);
}
#--
#-- SUB: getKeyColumns
#-- Returns the keys(on master and on snapshot log) involved on a "create snapshot WITH" statement
#--
sub getKeyColumns {
#-- Function parameters
my ($dbh, $schemaname, $mastername, $withwhat) = @_;
#-- Local variables
my $sth;
my %masterKeyColumns = ();
my %logKeyColumns = ();
my %masterFilterColumns = ();
my %masterPkColumns = ();
my %tbFields = ();
my @pk;
my $fields;
my $flag = 0x20; #-- ALL=0x20 HASPK=0x40 ROWID=0x01 PK=0x02 FILTER=0x04
#-- Get object's metadata
$sth = getObjectMeta($dbh, $schemaname, $mastername);
#-- Creates a hashmap for FIELD NAME to FIELD TYPE mapping
for (my $i = 0; $i < $sth->{NUM_OF_FIELDS}; ++$i) {
my $fieldname = lc($sth->{NAME}->[$i]);
my $type = $dbh->type_info( [ $sth->{TYPE}->[$i] ] )->{TYPE_NAME};
%tbFields->{$fieldname} = $type;
}
#-- Get the table's primary key fields
@pk = $dbh->primary_key( undef, $schemaname, $mastername );
$dbh->commit;
#elog NOTICE, 'MASTER=(FIELDS:'.join(',', keys %tbFields).' TYPES:'.join(',', values %tbFields).' PK:'.join(',', @pk).')' if $main::DEBUG==1;
$withwhat = lc($withwhat);
$fields = '';
if ( $withwhat =~ m/.*\(.*\).*/ ) {
#-- We have parenthesis = additional fields
$fields = $withwhat;
#-- Save the additional fields
$fields =~ s/^.*\((.*)\).*$/$1/;
#-- Remove the additional fields from WITH
$withwhat =~ s/^(.*)\(.*\)(.*)$/$1$2/;
}
#-- Parse the remaining arguments of WITH
my @aux = split(/,/, $withwhat);
for (my $i=0; $i < @aux; ++$i) {
if (@aux[$i] eq 'oid') {
$flag |= 0x01;
%masterKeyColumns->{'oid'}=1;
%logKeyColumns->{'m_row$$'}='oid';
} elsif (@aux[$i] eq 'primary key') {
$flag |= 0x02;
$flag |= 0x40;
if (@pk eq 0) {
elog ERROR, 'Source does not have a PRIMARY KEY';
}
for (my $j=0; $j < @pk; ++$j) {
%masterPkColumns->{@pk[$j]}=1;
%masterKeyColumns->{@pk[$j]}=1;
%logKeyColumns->{@pk[$j]}=%tbFields->{@pk[$j]};
}
} else {
elog ERROR, 'Syntax error on WITH clause: '.@aux[$i];
}
}
#-- Parse the additional fields of WITH
@aux = split(/,/, $fields);
for (my $i=0; $i < @aux; ++$i) {
my $column = @aux[$i];
if (%tbFields->{$column} eq undef) {
elog ERROR, "Column not found: [$column]";
}
%masterFilterColumns->{$column}=1;
%masterKeyColumns->{$column}=1;
%logKeyColumns->{$column} = %tbFields->{$column};
$flag |= 0x04;
}
#--elog NOTICE, 'KEY='.join(',', keys %logKeyColumns) . ' TYPES='.join(',', values %logKeyColumns) if $main::DEBUG == 1;
return ($flag, \%masterKeyColumns, \%logKeyColumns, \%masterPkColumns, \%masterFilterColumns);
}
#--
#-- SUB: createSnapshotLogTable
#-- Creates the SNAPSHOT LOG table
#--
sub createSnapshotLogTable {
my ($schemaname, $mastername, $logKeyColumns) = @_;
my $strcolumns = '';
my $sql;
my $sth;
while (my ($key,$value) = each %$logKeyColumns) {
elog NOTICE, "$key=>$value" if $main::DEBUG==1;
my $entry = "$key $value";
$strcolumns = "$strcolumns$entry,\n";
}
$sql = <<SQL;
CREATE TABLE $schemaname.mlog\$_$mastername (
$strcolumns
snaptime\$\$ timestamp,
dmltype\$\$ varchar(1),
old_new\$\$ varchar(1),
change_vector\$\$ varchar(255)
)
SQL
elog NOTICE, $sql if $main::DEBUG==1;
$sth = spi_exec_query($sql);
if ($sth->{status} ne 'SPI_OK_UTILITY') {
elog ERROR, "Could not create the snapshot log for table '$schemaname.$mastername'. STATUS='".$sth->status."' SQL='$sql'";
}
return "mlog\$_$mastername";
}
#--
#-- SUB: createSnapshotLogTableIndexes
#-- Creates the INDEXES on the SNAPSHOT LOG table, for performance reasons only
#--
sub createSnapshotLogTableIndexes {
my ($schemaname, $mastername, $log, $logKeyColumns) = @_;
my $sth;
my $sql;
#-- Create the Snapshot's Log Table Indexes
my $keys = join(',', keys %$logKeyColumns);
$sql = <<SQL;
CREATE INDEX ${log}_ix1 on $schemaname.$log ($keys);
SQL
$sth = spi_exec_query($sql);
if ($sth->{status} ne 'SPI_OK_UTILITY') {
elog ERROR, "Could not create the snapshot log index for table '$schemaname.$mastername'. STATUS='".$sth->status."' SQL='$sql'";
}
$sql = <<SQL;
CREATE INDEX ${log}_ix2 on $schemaname.$log (snaptime\$\$, dmltype\$\$);
SQL
$sth = spi_exec_query($sql);
if ($sth->{status} ne 'SPI_OK_UTILITY') {
elog ERROR, "Could not create the snapshot log index for table '$schemaname.$mastername'. STATUS='".$sth->status."' SQL='$sql'";
}
}
#--
#-- SUB: createSnapshotLogTrigger
#-- Creates the triggers on the MASTER object in order to fill the SNAPSHOT LOG entries
#--
sub createSnapshotLogTrigger {
#-- Function parameters
my ($dbh, $schemaname, $mastername, $log, $masterKeyColumns, $logKeyColumns) = @_;
#-- Local variables
my $sth;
my $sql;
#-- Get object's metadata
$sth = getObjectMeta($dbh, $schemaname, $mastername);
#-- Create the PL/PgSQL code for computing the CHANGEVECTOR field
my $all_unset_cv = '0' x ($sth->{NUM_OF_FIELDS} / 8);
my $all_set_cv = 'F' x ($sth->{NUM_OF_FIELDS} / 8);
if (($sth->{NUM_OF_FIELDS} % 8) ne 0) {
$all_unset_cv = $all_unset_cv . '0';
$all_set_cv = $all_set_cv.sprintf("%x",(2^($sth->{NUM_OF_FIELDS} % 8)-1));
}
my $compute_change_vector = '';
for (my $i = 0; $i < $sth->{NUM_OF_FIELDS}; ++$i) {
my $fieldname = $sth->{NAME}->[$i];
my $value = 2**$i;
$compute_change_vector = $compute_change_vector . <<SQL;
IF (OLD.$fieldname <> NEW.$fieldname) THEN
CVB = CVB + $value;
END IF;
SQL
if ((($i + 1) % 8) eq 0) {
$compute_change_vector = $compute_change_vector . <<SQL;
CV = CV || to_hex(CVB);
ALL_CVB = ALL_CVB + CVB;
CVB = 0;
SQL
}
}
if (($sth->{NUM_OF_FIELDS} % 8) ne 0) {
$compute_change_vector = $compute_change_vector . <<SQL;
CV = CV || to_hex(CVB);
ALL_CVB = ALL_CVB + CVB;
SQL
}
$sth->finish;
$dbh->commit;
#-- Create the TRIGGER function
elog NOTICE, "Creating trigger function $schemaname.${log}_trgfn()" if $main::DEBUG==1;
my $oldMasterKeys = 'OLD.'.join(', OLD.', keys %$masterKeyColumns);
my $newMasterKeys = 'NEW.'.join(', NEW.', keys %$masterKeyColumns);
$sql = <<SQL;
CREATE OR REPLACE FUNCTION $schemaname.${log}_trgfn()
RETURNS trigger AS
\$BODYFN\$
DECLARE
CV varchar(255);
CVB integer;
ALL_CVB numeric;
BEGIN
IF (TG_OP = 'DELETE') THEN
INSERT INTO $schemaname.$log SELECT $oldMasterKeys,NULL,'D','O','$all_unset_cv';
ELSIF (TG_OP = 'UPDATE') THEN
CV = '';
CVB = 0;
ALL_CVB = 0;
$compute_change_vector
IF (ALL_CVB > 0) THEN
INSERT INTO $schemaname.$log SELECT $oldMasterKeys,NULL,'U','O',CV;
END IF;
IF (OLD.oid <> NEW.oid) THEN
INSERT INTO $schemaname.$log SELECT $newMasterKeys,NULL,'U','N',CV;
END IF;
ELSIF (TG_OP = 'INSERT') THEN
INSERT INTO $schemaname.$log SELECT $newMasterKeys,NULL,'I','N','$all_set_cv';
END IF;
RETURN NULL;
END;
\$BODYFN\$
LANGUAGE 'plpgsql' VOLATILE;
SQL
elog NOTICE, $sql if $main::DEBUG==1;
$dbh->do($sql);
if ($dbh->err) {
elog ERROR, "Could not create the snapshot log trigger function for table '$schemaname.$mastername'. ERR='".$dbh->errstr."' SQL='$sql'";
}
#-- Create TRIGGER on master table
elog NOTICE, "Creating trigger ${log}_trg on $schemaname.${mastername}" if $main::DEBUG==1;
$sql = <<SQL;
CREATE TRIGGER ${log}_trg AFTER INSERT OR UPDATE OR DELETE
ON $schemaname.${mastername} FOR EACH ROW
EXECUTE PROCEDURE $schemaname.${log}_trgfn ()
SQL
$dbh->do($sql);
if ($dbh->err) {
elog ERROR, "Could not create the snapshot log trigger on table '$schemaname.$mastername'. ERR='".$dbh->errstr."' SQL='$sql'";
}
}
sub createSnapshotLogEntry {
my ($dbh, $schemaname, $mastername, $flag, $log, $masterPkColumns, $masterFilterColumns) = @_;
my $sth;
#-- Add the Snapshot Log to the public.pg_mlogs table
$sql = <<SQL;
INSERT INTO public.pg_mlogs(masterschema, mastername, flag, log)
VALUES (?,?,?,?)
SQL
$sth = $dbh->prepare($sql);
if ($sth->err) {
elog ERROR, "Could not prepare the snapshot log entry on system table 'public.pg_mlogs'. ERROR='".$sth->errstr."' SQL='$sql'";
}
$sth->execute(($schemaname, $mastername, $flag, $log));
if ($sth->err) {
elog ERROR, "Could not create the snapshot log entry on system table 'public.pg_mlogs'. ERROR='".$sth->errstr."' SQL='$sql'";
}
my $snaplogid;
my @row;
$sql = <<SQL;
SELECT snaplogid
FROM public.pg_mlogs
WHERE masterschema=?
AND mastername=?
SQL
$sth = $dbh->prepare($sql);
$sth->execute(($schemaname, $mastername));
if ($sth->err) {
elog ERROR, $sth->errstr."' SQL='$sql'";
}
@row = $sth->fetchrow_array;
$snaplogid = @row[0];
elog NOTICE, 'SNAPLOGID='.$snaplogid if $main::DEBUG == 1;
createSnapshotLogEntryColumns($dbh, $schemaname, $mastername, $flag, $log, $snaplogid, $masterPkColumns, $masterFilterColumns);
}
#--
#-- SUB: call_driver_function
#-- Calls a driver function (native) on the database pointed by the database handle ($dbh)
#--
sub call_driver_function {
my ($dbh, $function_name, $operation, $masterschema, $mastername, $additional) = @_;
$masterschema = uc($masterschema);
$mastername = uc($mastername);
elog NOTICE, "Calling DRIVER function $function_name ($operation, $masterschema, $mastername, $additional)" if $main::DEBUG==1;
if ($function_name eq 'snapshot_do') {
dbi_execute_stored_procedure($dbh, 'snapshot_do', ('123456', $operation, $masterschema, $mastername, $additional));
} elsif ($function_name eq 'snapshotlog_exists') {
return dbi_call_function($dbh, 'snapshotlog_exists', ($masterschema, $mastername));
} elsif ($function_name eq 'last_log_refresh') {
return dbi_call_function($dbh, 'last_log_refresh' , ($masterschema, $mastername, $additional));
} elsif ($function_name eq 'snapshotlog_columns') {
return dbi_call_function($dbh, 'snapshotlog_columns', ($masterschema, $mastername));
} elsif ($function_name eq 'snapshotlog_name') {
return dbi_call_function($dbh, 'snapshotlog_name', ($masterschema, $mastername));
} elsif ($function_name eq 'count_log_modified_rows') {
return dbi_call_function($dbh, 'count_log_modified_rows', ($masterschema, $mastername, $additional));
} elsif ($function_name eq 'snapshotlog_ud_filter') {
return dbi_call_function($dbh, 'snapshotlog_ud_filter', ($additional));
} elsif ($function_name eq 'snapshotlog_iu_filter') {
return dbi_call_function($dbh, 'snapshotlog_iu_filter', ($additional));
} else {
elog ERROR, "Could not call DRIVER function: $function_name";
}
$dbh->commit;
}
sub dbi_execute_stored_procedure {
use DBI::Const::GetInfoType;
my ($dbh, $procname, @parameters) = @_;
my $dbname = lc($dbh->get_info( $GetInfoType{SQL_DBMS_NAME} ));
my $sql;
my $sth;
my $qmarks = '?,' x @parameters;
chop $qmarks;
if ($dbname eq 'oracle') {
$sql = "BEGIN $procname($qmarks); END;";
} elsif ($dbname eq 'postgresql') {
$sql = "SELECT $procname($qmarks)";
} else {
$sql = "SELECT $procname($qmarks)";
}
$sth = $dbh->prepare($sql);
if ($dbh->err) {
elog ERROR, $dbh->errstr."' SQL='$sql'";
}
$sth->execute(@parameters);
if ($sth->err) {
elog ERROR, $sth->errstr."' SQL='$sql'";
}
}
sub dbi_call_function {
use DBI::Const::GetInfoType;
my ($dbh, $funcname, @parameters) = @_;
my $dbname = lc($dbh->get_info( $GetInfoType{SQL_DBMS_NAME} ));
my $sql;
my $sth;
my @row;
my $qmarks = '?,' x @parameters;
chop $qmarks;
if ($dbname eq 'oracle') {
$sql = "SELECT $funcname($qmarks) FROM DUAL";
} elsif ($dbname eq 'postgresql') {
$sql = "SELECT $funcname($qmarks)";
} else {
$sql = "SELECT $funcname($qmarks)";
}
$sth = $dbh->prepare($sql);
if ($dbh->err) {
elog ERROR, $dbh->errstr."' SQL='$sql'";
}
$sth->execute(@parameters);
if ($sth->err) {
elog ERROR, $sth->errstr."' SQL='$sql'";
}
@row = $sth->fetchrow_array;
my $result = @row[0];
return $result;
}
#-- Dependencies
#--
#-- SUB: getObjectMeta
#-- Returns an empty result STATEMENT in order to retrieve the object's METADATA
#--
sub getObjectMeta {
my ($dbh, $schemaname, $mastername) = @_;
my $sql;
my $sth;
$sql = <<SQL;
SELECT * from $schemaname.$mastername
WHERE 1=0
SQL
$sth = $dbh->prepare($sql);
if (! $sth->err) {
$sth->execute();
}
return $sth;
}sub createSnapshotLogEntryColumns {
my ($dbh, $schemaname, $mastername, $flag, $log, $snaplogid, $masterPkColumns, $masterFilterColumns) = @_;
my $sql;
my $sth;
#-- Add the Snapshot Log Columna to the public.pg_mlog_refcols table
$sql = <<SQL;
INSERT INTO public.pg_mlog_refcols(snaplogid, masterschema, mastername, colname, flag)
VALUES (?,?,?,?,?)
SQL
$sth = $dbh->prepare($sql);
if ($sth->err) {
elog ERROR, "Could not prepare the snapshot log column entry on system table 'public.pg_mlog_refcols'. ERROR='".$sth->errstr."' SQL='$sql'";
}
if ($flag & 0x42) {
#-- PK
while (my ($key,$value) = each %$masterPkColumns) {
$sth->execute($snaplogid, $schemaname, $mastername, $key, 2);
if ($sth->err) {
elog ERROR, "Could not create the snapshot log column entry on system table 'public.pg_mlog_refcols'. ERROR='".$sth->errstr."' SQL='$sql'";
}
}
}
if ($flag & 0x04) {
#-- FILTER
while (my ($key,$value) = each %$masterFilterColumns) {
$sth->execute($snaplogid, $schemaname, $mastername, $key, 2);
if ($sth->err) {
elog ERROR, "Could not create the snapshot log column entry on system table 'public.pg_mlog_refcols'. ERROR='".$sth->errstr."' SQL='$sql'";
}
}
}
}
$BODY$
LANGUAGE 'plperlu' VOLATILE;
ALTER FUNCTION public.create_snapshot_log(schemaname text, mastername text, withwhat text) OWNER TO postgres;
COMMENT ON FUNCTION public.create_snapshot_log(schemaname text, mastername text, withwhat text) IS $$
This function is part of PostgreSQL::Snapshots project.
This is the function that creates a snapshot log.
$$;
------------------------------------------------------------------------------
-- FUNCTION: public.drop_snapshot_log
--
-- Removes a previously created SNAPSHOT LOG
------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION public.drop_snapshot_log(schemaname text, mastername text)
RETURNS bool AS
$BODY$
use strict;
use DBI;
use constant TRUE => 1;
use constant FALSE => "";
#-- Function parameters
my ($schemaname, $mastername) = @_;
#-- Set this to 1 for debugging messages
$main::DEBUG=0;
#-- Localhost superuser connection
my $dbh_local = DBI->connect('dbi:Pg:dbname=labpeTeste', 'postgres', undef, {AutoCommit => 0});
my $sql;
my $rs;
my $row;
my $log;
if (! snapshotLogExists($dbh_local, $schemaname, $mastername)) {
elog ERROR, "Snapshot log on '$schemaname.$mastername' does not exist!";
}
elog NOTICE, "Snapshot log on '$schemaname.$mastername' exists..." if $main::DEBUG==1;
$log = getSnapshotLogName($dbh_local, $schemaname, $mastername);
elog NOTICE, "Snapshot log on '$schemaname.$mastername' is $log" if $main::DEBUG==1;
$dbh_local->commit;
#-- Drop the Snapshot's Log Table
$sql = <<SQL;
DROP TABLE $schemaname.$log
SQL
$rs = spi_exec_query($sql);
if ($rs->{status} ne 'SPI_OK_UTILITY') {
elog ERROR, "Could not drop Snapshot log '$schemaname.$log'. STATUS='".$rs->status."' SQL='$sql'";
}
elog NOTICE, "Snapshot log '$log' dropped." if $main::DEBUG==1;
#-- Delete the Snapshot Log entry
$sql = <<SQL;
DELETE FROM public.pg_mlogs
WHERE masterschema=?
AND mastername=?
SQL
$rs = $dbh_local->prepare($sql);
$rs->execute($schemaname, $mastername);
if ($rs->err) {
elog ERROR, "Could not delete snapshot log entry on system table 'public.pg_mlogs'. ERR='".$rs->errstr."' SQL='$sql'";
}
elog NOTICE, "Snapshot log entry removed." if $main::DEBUG==1;
#-- Drop triggers on master table
$sql = <<SQL;
DROP TRIGGER ${log}_trg ON $schemaname.${mastername}
SQL
$dbh_local->do($sql);
if ($dbh_local->err) {
elog ERROR, "Could not drop the snapshot log trigger on table '$schemaname.$mastername'. ERR='".$dbh_local->errstr."' SQL='$sql'";
}
elog NOTICE, "Snapshot log table trigger dropped." if $main::DEBUG==1;
#-- Drop trigger's function
$sql = <<SQL;
DROP FUNCTION $schemaname.${log}_trgfn();
SQL
$dbh_local->do($sql);
if ($dbh_local->err) {
elog ERROR, "Could not drop the snapshot log trigger function '$schemaname.mlog\$_${mastername}_trgfn()'. ERR='".$dbh_local->errstr."' SQL='$sql'";
}
elog NOTICE, "Snapshot log trigger function dropped." if $main::DEBUG==1;
$dbh_local->commit;
$dbh_local->disconnect;
#-- All done. Let's return TRUE
return TRUE;
#--
#-- SUB: sqlLookup
#-- Returns a one row result from a SQL query
#--
sub sqlLookup {
my ($dbh, $sql, @params) = @_;
return sqlLookup2 ($dbh, $sql, \@params, TRUE);
}
sub sqlLookupSoft {
my ($dbh, $sql, @params) = @_;
return sqlLookup2 ($dbh, $sql, \@params, FALSE);
}
sub sqlLookup2 {
my ($dbh, $sql, $params, $errorsAreFatal) = @_;
my $sth;
my @row;
elog NOTICE, "Lookup at SQL:$sql with ".((@$params ne 0) ? join(',', @$params) : '<none>') if $main::DEBUG==1;
$sth = $dbh->prepare($sql);
if (@$params ne 0) {
$sth->execute(@$params);
} else {
$sth->execute();
}
if ($sth->err) {
if ($errorsAreFatal) {
elog ERROR, "Fatal error! SQL=$sql ERROR=".$sth->errstr;
}
}
my @row = $sth->fetchrow_array;
$sth->finish;
return @row;
}
#--
#-- SUB: snapshotLogExists
#-- Returns whether a master object has a SNAPSHOT LOG
#--
sub snapshotLogExists {
#-- Function parameters
my ($dbh, $masterschema, $mastername) = @_;
my $result = call_driver_function($dbh, 'snapshotlog_exists', undef, $masterschema, $mastername, undef);
return ($result eq 'T');
}
#--
#-- SUB: getSnapshotLogName
#-- Returns the snapshot log's table name
#--
sub getSnapshotLogName {
my ($dbh, $masterschema, $mastername) = @_;
return call_driver_function($dbh, 'snapshotlog_name', undef, $masterschema, $mastername, undef);
}
#-- Dependencies
#--
#-- SUB: call_driver_function
#-- Calls a driver function (native) on the database pointed by the database handle ($dbh)
#--
sub call_driver_function {
my ($dbh, $function_name, $operation, $masterschema, $mastername, $additional) = @_;
$masterschema = uc($masterschema);
$mastername = uc($mastername);
elog NOTICE, "Calling DRIVER function $function_name ($operation, $masterschema, $mastername, $additional)" if $main::DEBUG==1;
if ($function_name eq 'snapshot_do') {
dbi_execute_stored_procedure($dbh, 'snapshot_do', ('123456', $operation, $masterschema, $mastername, $additional));
} elsif ($function_name eq 'snapshotlog_exists') {
return dbi_call_function($dbh, 'snapshotlog_exists', ($masterschema, $mastername));
} elsif ($function_name eq 'last_log_refresh') {
return dbi_call_function($dbh, 'last_log_refresh' , ($masterschema, $mastername, $additional));
} elsif ($function_name eq 'snapshotlog_columns') {
return dbi_call_function($dbh, 'snapshotlog_columns', ($masterschema, $mastername));
} elsif ($function_name eq 'snapshotlog_name') {
return dbi_call_function($dbh, 'snapshotlog_name', ($masterschema, $mastername));
} elsif ($function_name eq 'count_log_modified_rows') {
return dbi_call_function($dbh, 'count_log_modified_rows', ($masterschema, $mastername, $additional));
} elsif ($function_name eq 'snapshotlog_ud_filter') {
return dbi_call_function($dbh, 'snapshotlog_ud_filter', ($additional));
} elsif ($function_name eq 'snapshotlog_iu_filter') {
return dbi_call_function($dbh, 'snapshotlog_iu_filter', ($additional));
} else {
elog ERROR, "Could not call DRIVER function: $function_name";
}
$dbh->commit;
}
sub dbi_execute_stored_procedure {
use DBI::Const::GetInfoType;
my ($dbh, $procname, @parameters) = @_;
my $dbname = lc($dbh->get_info( $GetInfoType{SQL_DBMS_NAME} ));
my $sql;
my $sth;
my $qmarks = '?,' x @parameters;
chop $qmarks;
if ($dbname eq 'oracle') {
$sql = "BEGIN $procname($qmarks); END;";
} elsif ($dbname eq 'postgresql') {
$sql = "SELECT $procname($qmarks)";
} else {
$sql = "SELECT $procname($qmarks)";
}
$sth = $dbh->prepare($sql);
if ($dbh->err) {
elog ERROR, $dbh->errstr."' SQL='$sql'";
}
$sth->execute(@parameters);
if ($sth->err) {
elog ERROR, $sth->errstr."' SQL='$sql'";
}
}
sub dbi_call_function {
use DBI::Const::GetInfoType;
my ($dbh, $funcname, @parameters) = @_;
my $dbname = lc($dbh->get_info( $GetInfoType{SQL_DBMS_NAME} ));
my $sql;
my $sth;
my @row;
my $qmarks = '?,' x @parameters;
chop $qmarks;
if ($dbname eq 'oracle') {
$sql = "SELECT $funcname($qmarks) FROM DUAL";
} elsif ($dbname eq 'postgresql') {
$sql = "SELECT $funcname($qmarks)";
} else {
$sql = "SELECT $funcname($qmarks)";
}
$sth = $dbh->prepare($sql);
if ($dbh->err) {
elog ERROR, $dbh->errstr."' SQL='$sql'";
}
$sth->execute(@parameters);
if ($sth->err) {
elog ERROR, $sth->errstr."' SQL='$sql'";
}
@row = $sth->fetchrow_array;
my $result = @row[0];
return $result;
}
$BODY$
LANGUAGE 'plperlu' VOLATILE;
ALTER FUNCTION public.drop_snapshot_log(schemaname text, mastername text) OWNER TO postgres;
COMMENT ON FUNCTION public.drop_snapshot_log(schemaname text, mastername text) IS $$
This function is part of PostgreSQL::Snapshots project.
This is the function that drops a snapshot log.
$$;
_______________________________________________
Grupo de Usuários do PostgreSQL no Brasil
Antes de perguntar consulte o manual
http://pgdocptbr.sourceforge.net/Para editar suas opções ou sair da lista acesse a página da lista em:
http://pgfoundry.org/mailman/listinfo/brasil-usuarios