Re: Problemas com Pg-snapshot

View: New views
1 Messages — Rating Filter:   Alert me  

Parent Message unknown Re: Problemas com Pg-snapshot

by PHP Developer :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Claro!
Obrigado!

Um abraço!


wallace reis wrote:
On 12/18/06, Php developer <brunophpdeveloper@...> wrote:
Não deu certo amigão. Fica na mesma!
Obrigado de qualquer maneira!
Se alguém tiver alguma outra sugestão, agradeço!!

Você pode enviar o código da função?

--
wallace reis
Núcleo de Biologia Computacional e
Gestão de Informações Biotecnológicas/LABBI

_______________________________________________ Grupo de Usuários do PostgreSQL no Brasil Antes de perguntar consulte o manual http://pgdocptbr.sourceforge.net/ Para editar suas opções ou sair da lista acesse a página da lista em: http://pgfoundry.org/mailman/listinfo/brasil-usuarios


--
-- ATTENTION: This is a free software.
--            View the LICENSE.txt file for license information
--

------------------------------------------------------------------------------
-- TABLE: public.pg_dblinks
--
-- Table that will hold the DBLINK connection info
-- Only visible to the 'postgres' user
------------------------------------------------------------------------------
CREATE TABLE public.pg_dblinks (
        dblinkid serial,
        dblinkname text,
        datasource text,
        username text,
        password text,
        attributes text,
        ctime timestamp DEFAULT now(),
        CONSTRAINT pg_dblink_pk PRIMARY KEY (dblinkid)
);
CREATE UNIQUE INDEX pg_dblinks_uix ON public.pg_dblinks(dblinkname);
COMMENT ON TABLE public.pg_dblinks IS
$$
This table contains the necessary connection information for a DBI
connection.
$$;

------------------------------------------------------------------------------
-- TABLE: public.pg_snapshots
--
-- Table that will hold the SNAPSHOT info
------------------------------------------------------------------------------
CREATE TABLE public.pg_snapshots (
        snapid serial,
        schemaname text not null,
        snapshotname text not null,
        query           text not null,
        dblinkid bigint,
        ctime timestamp default now(),
        snaptime timestamp,
        elapsedtime interval,
        auto_time time,
        auto_interval interval,
        kind char(1) check (kind in ('F', 'C', 'R')) default 'R', --F=FAST C=COMPLETE R=FORCE
        pbt_table text, -- ON PREBUILT TABLE (prebuilt table name)
        CONSTRAINT pg_snapshots_pk primary key (snapid),
        CONSTRAINT pg_snapshots_dblinks_fk foreign key (dblinkid) references public.pg_dblinks(dblinkid) ON UPDATE RESTRICT ON DELETE RESTRICT
);
CREATE UNIQUE INDEX pg_snapshots_uix ON public.pg_snapshots(schemaname, snapshotname);
CREATE INDEX pg_snapshots_1_ix ON public.pg_snapshots(schemaname, pbt_table);
GRANT select ON public.pg_snapshots to public;
COMMENT ON TABLE public.pg_snapshots IS
$$
This table contains the list of snapshots on your system a the
query/dblink used to create it and to fill it up.
$$;

------------------------------------------------------------------------------
-- TABLE: public.pg_mlogs
--
-- Table that will hold the SNAPSHOT LOG info
------------------------------------------------------------------------------
CREATE TABLE public.pg_mlogs (
        snaplogid serial,
        masterschema text NOT NULL,
        mastername text NOT NULL,
        flag numeric NOT NULL,
        log text NOT NULL,
        CONSTRAINT pg_mlogs_pk PRIMARY KEY (snaplogid)
);
CREATE UNIQUE INDEX pg_mlogs_uix ON public.pg_mlogs(masterschema, mastername);
GRANT select ON public.pg_mlogs to public;
COMMENT ON TABLE public.pg_mlogs IS
$$
This table contains the list of snapshot logs.
$$;

------------------------------------------------------------------------------
-- TABLE: public.pg_mlog_refcols
--
-- Table that will hold the SNAPSHOT LOG columns info
------------------------------------------------------------------------------
CREATE TABLE public.pg_mlog_refcols (
        snaplogid numeric,
        masterschema text NOT NULL,
        mastername text NOT NULL,
        colname text NOT NULL,
        oldest timestamp NOT NULL DEFAULT now(),
        flag numeric(1),
        CONSTRAINT pg_mlog_refcols_pk PRIMARY KEY (snaplogid, colname),
        CONSTRAINT pg_mlog_refcols_mlog_fk FOREIGN KEY (snaplogid) REFERENCES public.pg_mlogs(snaplogid) ON DELETE CASCADE
);
CREATE UNIQUE INDEX pg_mlog_refcols_uix ON public.pg_mlog_refcols(masterschema, mastername, colname);
GRANT select ON public.pg_mlog_refcols to public;
COMMENT ON TABLE public.pg_mlog_refcols IS
$$
This table contains the list of snapshot logs.
$$;

------------------------------------------------------------------------------
-- TABLE: public.pg_slogs
--
-- Table that will hold the SNAPSHOT LOG refresh info
------------------------------------------------------------------------------
CREATE TABLE public.pg_slogs (
        snaplogid bigint,
        snapid bigint,
        snaptime timestamp not null,
        userid int4 not null,
        CONSTRAINT pg_slogs_pk primary key (snaplogid, snapid),
        CONSTRAINT pg_slogs_mlog_fk FOREIGN KEY (snaplogid) REFERENCES public.pg_mlogs(snaplogid) ON DELETE CASCADE,
        CONSTRAINT pg_slogs_snap_fk FOREIGN KEY (snapid) REFERENCES public.pg_snapshots(snapid) ON DELETE CASCADE
);
CREATE UNIQUE INDEX pg_slogs_ix_1 on public.pg_slogs(snapid, snaplogid);
CREATE UNIQUE INDEX pg_slogs_ix_2 on public.pg_slogs(snapid, snaptime, snaplogid);
CREATE UNIQUE INDEX pg_slogs_ix_3 on public.pg_slogs(snaplogid, snaptime, snapid);
GRANT select ON public.pg_slogs to public;
COMMENT ON TABLE public.pg_slogs IS
$$
This table contains the list of master tables that have snapshot logs and are referenced by snapshots elsewhere. This table, along with the master table log, allows fast refreshes of snapshots based on snapshot logs.
$$;
--
-- ATTENTION: This is a free software.
--            View the LICENSE.txt file for license information
--

------------------------------------------------------------------------------
-- FUNCTION: public.create_dblink
--
-- Creates a DATABASE LINK to any databases supported by PERL
------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION public.create_dblink(dblinkname text, datasource text, username text, password text, attributes text)
  RETURNS bool AS
$BODY$
use strict;
use DBI;
use constant TRUE => 1;
use constant FALSE => "";
#-- Function parameters
my ($dblinkname, $datasource, $username, $password, $attributes) = @_;
$dblinkname = lc($dblinkname);
my $attr_href = eval($attributes);
#-- Set this to 1 for debugging messages
$main::DEBUG=0;
#-- Localhost superuser connection
my $dbh_local = DBI->connect('dbi:Pg:dbname=labpeTeste', 'postgres', undef, {AutoCommit => 0});
#-- Remote database connection
my $dbh;
#-- variables
my $sql;
my $sth;

#-- Test if DBLINK already exists
if (dblinkExists($dbh_local, $dblinkname)) {
        elog ERROR, "DBLink $dblinkname already created";
}

#-- Try to connect to the remote database
$dbh = dbiGetConnection($datasource, $username, $password, $attr_href);
$dbh->disconnect;

#-- Create the DBLINK entry on public.pg_dblinks table
$sql = <<SQL;
INSERT INTO public.pg_dblinks(dblinkname, datasource, username, password, attributes)
VALUES (?, ?, ?, ?, ?)
SQL
$sth = $dbh_local->prepare($sql);
$sth->execute(($dblinkname, $datasource, $username, $password, $attributes));

if (! $sth->err) {
        elog NOTICE, "DBLink created" if $main::DEBUG==1;
} else {
        elog ERROR, "Could not create DBLink '$dblinkname' ERROR=". $sth->errstr;
}
$dbh_local->commit;
$dbh_local->disconnect;

#-- All done. Let's return TRUE
return TRUE;

#--
#-- SUB: sqlLookup
#-- Returns a one row result from a SQL query
#--
sub sqlLookup {
        my ($dbh, $sql, @params) = @_;
        return sqlLookup2 ($dbh, $sql, \@params, TRUE);
}
sub sqlLookupSoft {
        my ($dbh, $sql, @params) = @_;
        return sqlLookup2 ($dbh, $sql, \@params, FALSE);
}

sub sqlLookup2 {
        my ($dbh, $sql, $params, $errorsAreFatal) = @_;
        my $sth;
        my @row;

        elog NOTICE, "Lookup at SQL:$sql with ".((@$params ne 0) ? join(',', @$params) : '<none>') if $main::DEBUG==1;

        $sth = $dbh->prepare($sql);
        if (@$params ne 0) {
                $sth->execute(@$params);
        } else {
                $sth->execute();
        }
        if ($sth->err) {
                if ($errorsAreFatal) {
                        elog ERROR, "Fatal error! SQL=$sql ERROR=".$sth->errstr;
                }
        }
        my @row = $sth->fetchrow_array;
        $sth->finish;
        return @row;
}
#--
#-- SUB: dblinkExists
#-- Returns whether a DBLINK exists or not
#--
sub dblinkExists {
        my ($dbh, $dblinkname) = @_;

        # local variables
        my $sql;
        my $sth;
        my $row;

        $sql = <<SQL;
SELECT count(*) as total
FROM public.pg_dblinks
WHERE dblinkname=?
SQL
        $sth = $dbh->prepare($sql);
        if ($dbh->err) {
                elog ERROR, $dbh->errstr;
        }
        $sth->execute(($dblinkname));
        if ($sth->err) {
                elog ERROR, "Could not find public.pg_dblink! ";
        }
        $row = $sth->fetchrow_hashref;
        return ($row->{'total'} ne 0);
}
#--
#-- SUB: dbiGetConnection
#-- Connects to a DBI datasource and returns a connection handle
#--
sub dbiGetConnection {
        my ($datasource, $username, $password, $attributes) = @_;
       
        # local variables
        my $dbh;

        $dbh = DBI->connect(
                $datasource
                , $username
                , $password
                , $attributes
                );
        if ($DBI::errstr) {
                elog ERROR, <<ERR;
Could not connect to database
data source: $datasource
user: $username
password: $password
dbh attributes:
$attributes

$DBI::errstr
ERR
        }
        return $dbh;
}

$BODY$
  LANGUAGE 'plperlu' VOLATILE;

ALTER FUNCTION public.create_dblink(dblinkname text, datasource text, username text, password text, attributes text) OWNER TO postgres;

COMMENT ON FUNCTION public.create_dblink(dblinkname text, datasource text, username text, password text, attributes text) IS $$
This function is part of PostgreSQL::Snapshots project.
This is the function that creates a dblink. First it test the connection then it adds a record on the public.pg_dblinks table.
$$;

------------------------------------------------------------------------------
-- FUNCTION: public.drop_dblink
--
-- Removes a previously created DATABASE LINK
------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION public.drop_dblink(dblinkname text)
  RETURNS bool AS
$BODY$
use strict;
use DBI;
use constant TRUE => 1;
use constant FALSE => "";
#-- Function parameters
my ($dblinkname) = @_;
$dblinkname = lc($dblinkname);
#-- Set this to 1 for debugging messages
$main::DEBUG=0;
#-- Localhost superuser connection
my $dbh_local = DBI->connect('dbi:Pg:dbname=labpeTeste', 'postgres', undef, {AutoCommit => 0});
#-- variables
my $sql;
my $sth;

#-- Lets make shure that the DBLINK exists
if (! dblinkExists($dbh_local, $dblinkname)) {
        elog ERROR, "DBLink '$dblinkname' does not exist";
}

#-- Delete the DBLINK entry
$sql = <<SQL;
DELETE FROM public.pg_dblinks WHERE dblinkname=?
SQL
$sth = $dbh_local->prepare($sql);
$sth->execute(($dblinkname));

if (! $sth->err) {
        elog NOTICE, "DBLink removed" if $main::DEBUG==1;
} else {
        elog ERROR, "Could not remove DBLink '$dblinkname' ERROR=".$sth->errstr;
}
$dbh_local->commit;
$dbh_local->disconnect;
#-- All done. Let's return TRUE
return TRUE;

#--
#-- SUB: dblinkExists
#-- Returns whether a DBLINK exists or not
#--
sub dblinkExists {
        my ($dbh, $dblinkname) = @_;

        # local variables
        my $sql;
        my $sth;
        my $row;

        $sql = <<SQL;
SELECT count(*) as total
FROM public.pg_dblinks
WHERE dblinkname=?
SQL
        $sth = $dbh->prepare($sql);
        if ($dbh->err) {
                elog ERROR, $dbh->errstr;
        }
        $sth->execute(($dblinkname));
        if ($sth->err) {
                elog ERROR, "Could not find public.pg_dblink! ";
        }
        $row = $sth->fetchrow_hashref;
        return ($row->{'total'} ne 0);
}
#--
#-- SUB: sqlLookup
#-- Returns a one row result from a SQL query
#--
sub sqlLookup {
        my ($dbh, $sql, @params) = @_;
        return sqlLookup2 ($dbh, $sql, \@params, TRUE);
}
sub sqlLookupSoft {
        my ($dbh, $sql, @params) = @_;
        return sqlLookup2 ($dbh, $sql, \@params, FALSE);
}

sub sqlLookup2 {
        my ($dbh, $sql, $params, $errorsAreFatal) = @_;
        my $sth;
        my @row;

        elog NOTICE, "Lookup at SQL:$sql with ".((@$params ne 0) ? join(',', @$params) : '<none>') if $main::DEBUG==1;

        $sth = $dbh->prepare($sql);
        if (@$params ne 0) {
                $sth->execute(@$params);
        } else {
                $sth->execute();
        }
        if ($sth->err) {
                if ($errorsAreFatal) {
                        elog ERROR, "Fatal error! SQL=$sql ERROR=".$sth->errstr;
                }
        }
        my @row = $sth->fetchrow_array;
        $sth->finish;
        return @row;
}

$BODY$
  LANGUAGE 'plperlu' VOLATILE;
ALTER FUNCTION public.drop_dblink(dblinkname text) OWNER TO postgres;
COMMENT ON FUNCTION public.drop_dblink(dblinkname text) IS $$
This function is part of PostgreSQL::Snapshots project.
This is the function that removes a dblink.
$$;
--
-- ATTENTION: This is a free software.
--            View the LICENSE.txt file for license information
--

------------------------------------------------------------------------------
-- FUNCTION: public.create_snapshot
--
-- Creates a SNAPSHOT based on a LOCAL query or on a DBLINK query
-- TODO: ON PREBUILT TABLE -> may be child OO table ?
------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION public.create_snapshot(schemaname text, snapshotname text, query text, dblink text, kind text, pbt_table text)
  RETURNS bool AS
$BODY$
use strict;
use DBI;
use constant TRUE => 1;
use constant FALSE => "";
#-- Function parameters
my ($schemaname, $snapshotname, $query, $dblinkname, $kindname, $pbt_table) = @_;
$schemaname = lc($schemaname);
$snapshotname = lc($snapshotname);
$dblinkname = lc($dblinkname);
$kindname = lc($kindname);
$pbt_table = lc($pbt_table);
#-- Set this to 1 for debugging messages
$main::DEBUG=0;
#-- Remote database connection
my $dbh;
#-- Localhost superuser connection
my $dbh_local = DBI->connect('dbi:Pg:dbname=labpeTeste', 'postgres', undef, {AutoCommit => 0});
my $sql = '';
my $sth;
my $row;
my $kind;
my $pbt;

if ($kindname eq 'fast') {
        $kind = 'F';
} elsif ($kindname eq 'complete') {
        $kind = 'C';
} elsif ($kindname eq 'force') {
        $kind = 'R';
} else {
        elog ERROR, "KIND of refresh should be 'FAST', 'COMPLETE' or 'FORCE' !";
}

if ("$pbt_table" eq '') {
        $pbt = '';
} else {
        $pbt = $pbt_table;
}

#-- Test if the SNAPSHOT entry already exists
if (snapshotExists($dbh_local, $schemaname, $snapshotname)) {
        elog ERROR, "Snapshot '$snapshotname' already created";
}

if ($pbt eq 'N') {
        #-- Test if another OBJECT exists on the same place
        if (objectExists($dbh_local, $schemaname, $snapshotname)) {
                elog ERROR, "An object at '$schemaname.$snapshotname' already exists";
        }
} else {
        #-- Test if the PREBUILT TABLE exists
        if (! objectExists($dbh_local, $schemaname, $pbt_table)) {
                elog ERROR, "Prebuilt table '$schemaname.$pbt_table' does not exist!";
        }
}

if ("$dblinkname" eq '') {
        #-- create local
        elog NOTICE, 'Creating LOCAL snapshot' if $main::DEBUG==1;
        createLocalSnapshot($dbh_local, $schemaname, $snapshotname, $query, $pbt);
} else {
        #-- create remote
        elog NOTICE, 'Creating REMOTE(DBLINK) snapshot' if $main::DEBUG==1;
        createRemoteSnapshot($dbh_local, $schemaname, $snapshotname, $query, $dblinkname, $pbt);
}

#-- get the DBLINK id
my $dblinkid = getDblinkId($dbh_local, $dblinkname);

#-- insert the snapshot into the catalog
$sql = <<SQL;
INSERT INTO public.pg_snapshots(schemaname, snapshotname, query, dblinkid, kind, pbt_table)
VALUES (?, ?, ?, ?, ?, ?)
SQL
$sth = $dbh_local->prepare($sql);
$sth->execute(($schemaname, $snapshotname, $query, $dblinkid, $kind, $pbt));
if (! $sth->err) {
        elog NOTICE, "Snapshot entry created" if $main::DEBUG==1;
} else {
        elog ERROR, "Could not create snapshot entry. SQL=$sql ERROR=".$sth->errstr;
}
$dbh_local->commit;
$dbh_local->disconnect;
#-- All done. Let's return TRUE
return TRUE;

#--
#-- SUB: sqlLookup
#-- Returns a one row result from a SQL query
#--
sub sqlLookup {
        my ($dbh, $sql, @params) = @_;
        return sqlLookup2 ($dbh, $sql, \@params, TRUE);
}
sub sqlLookupSoft {
        my ($dbh, $sql, @params) = @_;
        return sqlLookup2 ($dbh, $sql, \@params, FALSE);
}

sub sqlLookup2 {
        my ($dbh, $sql, $params, $errorsAreFatal) = @_;
        my $sth;
        my @row;

        elog NOTICE, "Lookup at SQL:$sql with ".((@$params ne 0) ? join(',', @$params) : '<none>') if $main::DEBUG==1;

        $sth = $dbh->prepare($sql);
        if (@$params ne 0) {
                $sth->execute(@$params);
        } else {
                $sth->execute();
        }
        if ($sth->err) {
                if ($errorsAreFatal) {
                        elog ERROR, "Fatal error! SQL=$sql ERROR=".$sth->errstr;
                }
        }
        my @row = $sth->fetchrow_array;
        $sth->finish;
        return @row;
}
#--
#-- SUB: snapshotExists
#-- Returns whether a snapshot exists or not
#--
sub snapshotExists {
        #-- Function parameters
        my ($dbh, $schemaname, $snapshotname) = @_;

        # local variables
        my $sql;
        my $sth;
        my $row;

        $sql = <<SQL;
SELECT count(*) as total
FROM public.pg_snapshots
WHERE schemaname=?
        and snapshotname=?
SQL
        $sth = $dbh->prepare($sql);
        $sth->execute(($schemaname, $snapshotname));
        if ($sth->err) {
                elog ERROR, "Could not find public.pg_snapshots !!!";
        }
        $row = $sth->fetchrow_hashref;
        return ($row->{'total'} ne 0);
}
#--
#-- SUB: objectExists
#-- Returns whether an object exists or not
#--
sub objectExists {
        my ($dbh, $schemaname, $snapshotname) = @_;

        # local variables
        my $sql;
        my $sth;
        my $row;

        $sql = <<SQL;
SELECT count(*) as total
FROM pg_catalog.pg_class c
        LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname=? AND c.relname = ?
SQL
        $sth = $dbh->prepare($sql);
        $sth->execute(($schemaname, $snapshotname));
        $row = $sth->fetchrow_hashref;
        return ($row->{total} ne 0);
}
#--
#-- SUB: getDblinkId
#-- Maps a DBLINK name to a DBLINK id
#--
sub getDblinkId {
        #-- Function parameters
        my ($dbh, $dblinkname) = @_;

        #-- Local variables
        my $sql;
        my $dblink;
        my $sth;

        if ("$dblinkname" eq '') {
                return undef;
        }

        #--Get DBLINK Id
        $sql = 'SELECT dblinkid FROM public.pg_dblinks WHERE dblinkname=?';
        $sth = $dbh->prepare($sql);
        $sth->execute(($dblinkname));

        #-- Fetch the dblink info
        $dblink = $sth->fetchrow_hashref;
        return $dblink->{'dblinkid'};
}
#--
#-- SUB: createLocalSnapshot
#-- Creates a snapshot based on a local database query
#--
sub createLocalSnapshot {
        #-- Function parameters
        my ($dbh, $schemaname, $snapshotname, $query, $pbt) = @_;

        #-- Local variables
        my $sql;
        my $sth;

        if ($pbt eq 'N') {
                #-- Creates an empty table based on the query
                $sql = <<SQL;
CREATE TABLE $schemaname.$snapshotname AS
        SELECT query.*
        FROM ($query) query
        WHERE 1=0
SQL
                $sth = spi_exec_query($sql);
                if ($sth->{status} eq 'SPI_OK_SELINTO') {
                        elog NOTICE, "Snapshot placeholder created" if $main::DEBUG==1;
                } else {
                        elog ERROR, "Could not create snapshot placeholder: $sql [".$sth->status."]";
                }
        } else {
                my $snapshotname = $pbt;

                #-- execute the query WHERE 1=0
                $sql = <<SQL;
SELECT query.*
FROM ($query) query
WHERE 1=0
SQL
                $sth = $dbh->prepare($sql);
                $sth->execute;
                if ($sth->err) {
                        elog ERROR, "Could not execute query: $sql [$sth->{errstr}]";
                }

                my $sth1;
                my $columns;
                my @cols = ();

                for ( my $i = 0 ; $i < $sth->{NUM_OF_FIELDS} ; $i++ ) {
                        push @cols, $sth->{NAME}->[$i];
                }
                $columns = join(',', @cols);
               
                #-- execute a query WHERE 1=0 on the existant table
                $sql = <<SQL;
SELECT $columns
FROM $schemaname.$snapshotname
WHERE 1=0
SQL
                $sth1 = $dbh->prepare($sql);
                $sth1->execute;
                if ($sth1->err) {
                        elog ERROR, "Could not execute query: $sql [$sth1->{errstr}]";
                }
                validateModifyPrebuiltTable($dbh, $schemaname, $snapshotname, $sth, $sth1);
                $sth1->finish();
                $dbh->commit;
        }
        $sth->finish();
}
#--
#-- SUB: createRemoteSnapshot
#-- Creates a snapshot based on a remote database query
#--
sub createRemoteSnapshot {
        #-- Function parameters
        my ($dbh, $schemaname, $snapshotname, $query, $dblinkname, $pbt) = @_;

        #-- Local variables
        my $dbh_remote;
        my $sql;
        my $sth;
        my $dblink;
        my $attr_href;

        #--Test if the DBLINK entry exists
        $sql = 'SELECT * FROM public.pg_dblinks WHERE dblinkname=?';
        $sth = $dbh->prepare($sql);
        $sth->execute(($dblinkname));
        if ($sth->err) {
                elog ERROR, "Table 'public.pg_dblinks' does not exist!";
                return FALSE;
        }

        #-- Fetch the dblink info
        $dblink = $sth->fetchrow_hashref;
        if (! $dblink->{'dblinkname'}) {
                elog ERROR, "DBLink '$dblinkname' does not exist!";
                return FALSE;
        }

        #-- Convert the attributes to a hashtable
        $attr_href = eval($dblink->{'attributes'});

        #-- Make the remote database connection
        $dbh_remote = dbiGetConnection(
                $dblink->{'datasource'}
                , $dblink->{'username'}
                , $dblink->{'password'}
                , $attr_href
                );

        #-- holds each column definition of the new table
        my @cols;

        #-- retrieve the structure of the snapshot query (empty query)
        $sql = getQueryWithNoRecords($dbh_remote, $query);
        $sth = $dbh_remote->prepare($sql);
        $sth->execute;

        if ($pbt eq 'N') {
                #-- Loop through all columns and retrieves the name and type of each one
                #-- Also maps the SQL Perl types to PostgreSQL ones
                #-- Place this info on @cols
                for ( my $i = 0 ; $i < $sth->{NUM_OF_FIELDS} ; $i++ ) {
                my $line = $sth->{NAME}->[$i];
                my $datatype = getUnifiedSqlType($sth->{TYPE}->[$i]);
                my $precision = $sth->{PRECISION}->[$i];
                my $scale = $sth->{SCALE}->[$i];
                my $isnullable = $sth->{NULLABLE}->[$i];
                my $nullable = '';
                if (! $isnullable) {
                        $nullable = 'NOT NULL';
                }
       
                if ( $datatype == DBI::SQL_INTEGER ) {
                        $line .= " INTEGER $nullable";
                } elsif ( $datatype == DBI::SQL_SMALLINT ) {
                        $line .= " SMALLINT $nullable";
                } elsif ( $datatype == DBI::SQL_NUMERIC ) {
                        $line .= " NUMERIC($precision, $scale) $nullable";
                } elsif ( $datatype == DBI::SQL_BOOLEAN ) {
                        $line .= " BOOLEAN $nullable";
                } elsif ( $datatype == DBI::SQL_CHAR ) {
                        $line .= " CHAR($precision) $nullable";
                } elsif ( $datatype == DBI::SQL_VARCHAR ) {
                        $line .= " VARCHAR($precision) $nullable";
                } elsif ( $datatype == DBI::SQL_CLOB ) {
                        $line .= " VARCHAR $nullable";
                } elsif ( $datatype == DBI::SQL_DATE ) {
                        $line .= " DATE $nullable";
                } elsif ( $datatype == DBI::SQL_TIME ) {
                        $line .= " TIME $nullable";
                } elsif ( $datatype == DBI::SQL_TIMESTAMP ) {
                        $line .= " TIMESTAMP $nullable";
                } elsif ( $datatype == DBI::SQL_INTERVAL ) {
                        $line .= " INTERVAL $nullable";
                } elsif ( $datatype == DBI::SQL_BLOB ) {
                        $line .= " BYTEA $nullable";
                } else { #--UNKNOWN
                        elog WARNING, "Unknown type '$datatype' mapped to 'TEXT'!";
                        $line .= " TEXT $nullable";
                }
                push @cols, $line;
                }
                $sth->finish;
       
                #-- Makes the CREATE TABLE SQL statement
                $sql = <<SQL;
CREATE TABLE $schemaname.$snapshotname (
  @{[ join("\n, ", @cols) ]}
)
SQL
                elog NOTICE, $sql if $main::DEBUG==1;
                $sth = spi_exec_query($sql);
                if ($sth->{status} eq 'SPI_OK_UTILITY') {
                        elog NOTICE, "Snapshot placeholder created" if $main::DEBUG==1;
                } else {
                        elog ERROR, "Could not create snapshot placeholder: $sql [$sth->{status}]";
                }
        } else {
                my $snapshotname = $pbt;
                my $sth1;
                my $columns;

                for ( my $i = 0 ; $i < $sth->{NUM_OF_FIELDS} ; $i++ ) {
                        push @cols, $sth->{NAME}->[$i];
                }
                $columns = join(',', @cols);
               
                #-- execute a query WHERE 1=0 on the existant table
                $sql = <<SQL;
SELECT $columns
FROM $schemaname.$snapshotname
WHERE 1=0
SQL
                $sth1 = $dbh->prepare($sql);
                $sth1->execute;
                if ($sth1->err) {
                        elog ERROR, "Could not execute query: $sql [$sth1->{status}]";
                }
                validateModifyPrebuiltTable($dbh, $schemaname, $snapshotname, $sth, $sth1);
                $sth1->finish();
                $dbh->commit;
        }
        $sth->finish();
        #-- Disconnects from the remote database
        $dbh_remote->disconnect;
}

#-- Dependencies
#--
#-- SUB: dbiGetConnection
#-- Connects to a DBI datasource and returns a connection handle
#--
sub dbiGetConnection {
        my ($datasource, $username, $password, $attributes) = @_;
       
        # local variables
        my $dbh;

        $dbh = DBI->connect(
                $datasource
                , $username
                , $password
                , $attributes
                );
        if ($DBI::errstr) {
                elog ERROR, <<ERR;
Could not connect to database
data source: $datasource
user: $username
password: $password
dbh attributes:
$attributes

$DBI::errstr
ERR
        }
        return $dbh;
}
#--
#-- SUB: getQueryWithNoRecords
#-- Places a FALSE filter into a query in order to retrieve no records (query structure only)
#--
sub getQueryWithNoRecords {
        #-- Function parameters
        my ($dbh, $query) = @_;

        my $sth;
        my $sql;

        #-- some fake databases do not support subqueries, so we need to modify
        #-- the original query and insert "1=0 AND " or "WHERE 1=0" as needed
        #-- of course, we try "SELECT * FROM ($query) WHERE 1=0" first,
        #-- for databases that support subqueries, or better: REAL databases
        $sql = 'SELECT * FROM ($query) query WHERE 1=0';
        elog NOTICE, $sql if $main::DEBUG == 1;
        $sth = $dbh->prepare($sql);
        if ($DBI::errstr) {
                $query =~ tr/\n/ /;
                my $lc_query = lc($query);
                my $wherepos = rindex($lc_query, " where ");
                if ($wherepos > 0) {
                        $sql = substr($query, 0, $wherepos) . ' WHERE 1=0 AND ' . substr($query, $wherepos + length(" where "));
                } else {
                        my $orderbypos = rindex($lc_query, " order by ");
                        my $groupbypos = rindex($lc_query, " group by ");
                        if ($orderbypos < $groupbypos) {
                                $wherepos = $orderbypos;
                        } else {
                                $wherepos = $groupbypos;
                        }
                        if ($wherepos == -1) {
                                $wherepos = length($lc_query);
                        }
                        $sql = substr($query, 0, $wherepos) . ' WHERE 1=0 ' . substr($query, $wherepos);
                }
       
                elog NOTICE, $sql if $main::DEBUG == 1;
                $sth = $dbh->prepare($sql);
                if ($DBI::errstr) {
                        my $err = <<ERR2;
Cannot prepare

$sql

$DBI::errstr
ERR2
                        elog ERROR, $err;
                }
        }
        return $sql;
}
#--
#-- SUB: getUnifiedSqlType
#-- Maps a set of SQL types into a single SQL type
#--
sub getUnifiedSqlType {
        my ($dbiType) = @_;

        #--------------------------------
        #-- SQL datatype codes
        #-- SQL_GUID                         (-11)
        #-- SQL_WLONGVARCHAR                 (-10)
        #-- SQL_WVARCHAR                      (-9)
        #-- SQL_WCHAR                         (-8)
        #-- SQL_BIT                           (-7)
        #-- SQL_TINYINT                       (-6)
        #-- SQL_BIGINT                        (-5)
        #-- SQL_LONGVARBINARY                 (-4)
        #-- SQL_VARBINARY                     (-3)
        #-- SQL_BINARY                        (-2)
        #-- SQL_LONGVARCHAR                   (-1)
        #-- SQL_UNKNOWN_TYPE                    0
        #-- SQL_ALL_TYPES                       0
        #-- SQL_CHAR                            1
        #-- SQL_NUMERIC                         2
        #-- SQL_DECIMAL                         3
        #-- SQL_INTEGER                         4
        #-- SQL_SMALLINT                        5
        #-- SQL_FLOAT                           6
        #-- SQL_REAL                            7
        #-- SQL_DOUBLE                          8
        #-- SQL_DATETIME                        9
        #-- SQL_DATE                            9
        #-- SQL_INTERVAL                       10
        #-- SQL_TIME                           10
        #-- SQL_TIMESTAMP                      11
        #-- SQL_VARCHAR                        12
        #-- SQL_BOOLEAN                        16
        #-- SQL_UDT                            17
        #-- SQL_UDT_LOCATOR                    18
        #-- SQL_ROW                            19
        #-- SQL_REF                            20
        #-- SQL_BLOB                           30
        #-- SQL_BLOB_LOCATOR                   31
        #-- SQL_CLOB                           40
        #-- SQL_CLOB_LOCATOR                   41
        #-- SQL_ARRAY                          50
        #-- SQL_ARRAY_LOCATOR                  51
        #-- SQL_MULTISET                       55
        #-- SQL_MULTISET_LOCATOR               56
        #-- SQL_TYPE_DATE                      91
        #-- SQL_TYPE_TIME                      92
        #-- SQL_TYPE_TIMESTAMP                 93
        #-- SQL_TYPE_TIME_WITH_TIMEZONE        94
        #-- SQL_TYPE_TIMESTAMP_WITH_TIMEZONE   95
        #-- SQL_INTERVAL_YEAR                 101
        #-- SQL_INTERVAL_MONTH                102
        #-- SQL_INTERVAL_DAY                  103
        #-- SQL_INTERVAL_HOUR                 104
        #-- SQL_INTERVAL_MINUTE               105
        #-- SQL_INTERVAL_SECOND               106
        #-- SQL_INTERVAL_YEAR_TO_MONTH        107
        #-- SQL_INTERVAL_DAY_TO_HOUR          108
        #-- SQL_INTERVAL_DAY_TO_MINUTE        109
        #-- SQL_INTERVAL_DAY_TO_SECOND        110
        #-- SQL_INTERVAL_HOUR_TO_MINUTE       111
        #-- SQL_INTERVAL_HOUR_TO_SECOND       112
        #-- SQL_INTERVAL_MINUTE_TO_SECOND     113
        #--------------------------------

#--PG_BOOL PG_BYTEA PG_CHAR PG_INT8 PG_INT2 PG_INT4 PG_TEXT PG_OID
#--PG_FLOAT4 PG_FLOAT8 PG_ABSTIME PG_RELTIME PG_TINTERVAL PG_BPCHAR
#--PG_VARCHAR PG_DATE PG_TIME PG_DATETIME PG_TIMESPAN PG_TIMESTAMP
        if (($dbiType == DBI::SQL_INTEGER)) {
                return DBI::SQL_INTEGER;
        } elsif (($dbiType == DBI::SQL_SMALLINT)) {
                return DBI::SQL_SMALLINT;
        } elsif (($dbiType == DBI::SQL_TYPE_TIME_WITH_TIMEZONE)
                || ($dbiType == DBI::SQL_TYPE_TIMESTAMP_WITH_TIMEZONE)
                || ( $dbiType == DBI::SQL_DATETIME )
                || ( $dbiType == DBI::SQL_TIMESTAMP )
                || ( $dbiType == DBI::SQL_TYPE_TIMESTAMP ) ) { #--TIMESTAMP
                return DBI::SQL_TIMESTAMP;
        } elsif (($dbiType == DBI::SQL_NUMERIC )
                || ($dbiType == DBI::SQL_DECIMAL )
                || ($dbiType == DBI::SQL_FLOAT )
                || ($dbiType == DBI::SQL_REAL )
                || ($dbiType == DBI::SQL_DOUBLE )
#-- || ($dbiType == DBI::SQL_BIGINT )
                || ($dbiType == DBI::SQL_TINYINT )) { #--NUMERIC
                return DBI::SQL_NUMERIC;
        } elsif ( ( $dbiType == DBI::SQL_BOOLEAN )
                || ( $dbiType == DBI::SQL_BIT ) ) { #--BOOLEAN
                return DBI::SQL_BOOLEAN;
        } elsif ( ($dbiType == DBI::SQL_CHAR )
                || ( $dbiType == DBI::SQL_WCHAR ) ) { #--CHAR
                return DBI::SQL_CHAR;
        } elsif ( ($dbiType == DBI::SQL_VARCHAR )
                || ( $dbiType == DBI::SQL_WVARCHAR )) { #--VARCHAR
                return DBI::SQL_VARCHAR;
        } elsif ( ( $dbiType == DBI::SQL_LONGVARCHAR )
                || ( $dbiType == DBI::SQL_WLONGVARCHAR )
                || ( $dbiType == DBI::SQL_CLOB ) ) { #--CLOB
                return DBI::SQL_CLOB;
        } elsif ( ( $dbiType == DBI::SQL_DATE )
                || ( $dbiType == DBI::SQL_TYPE_DATE ) ) { #--DATE
                return DBI::SQL_DATE;
        } elsif ( ( $dbiType == DBI::SQL_TIME )
                || ( $dbiType == DBI::SQL_TYPE_TIME ) ) { #--TIME
                return DBI::SQL_TIME;
        } elsif ( ( $dbiType == DBI::SQL_INTERVAL )
                || ( $dbiType == DBI::SQL_INTERVAL_YEAR )
                || ( $dbiType == DBI::SQL_INTERVAL_MONTH )
                || ( $dbiType == DBI::SQL_INTERVAL_DAY )
                || ( $dbiType == DBI::SQL_INTERVAL_HOUR )
                || ( $dbiType == DBI::SQL_INTERVAL_MINUTE )
                || ( $dbiType == DBI::SQL_INTERVAL_SECOND )
                || ( $dbiType == DBI::SQL_INTERVAL_YEAR_TO_MONTH )
                || ( $dbiType == DBI::SQL_INTERVAL_DAY_TO_HOUR )
                || ( $dbiType == DBI::SQL_INTERVAL_DAY_TO_MINUTE )
                || ( $dbiType == DBI::SQL_INTERVAL_DAY_TO_SECOND )
                || ( $dbiType == DBI::SQL_INTERVAL_HOUR_TO_MINUTE )
                || ( $dbiType == DBI::SQL_INTERVAL_HOUR_TO_SECOND )
                || ( $dbiType == DBI::SQL_INTERVAL_MINUTE_TO_SECOND ) ) { #--INTERVAL
                return DBI::SQL_INTERVAL;
        } elsif ( ( $dbiType == DBI::SQL_BINARY )
                || ( $dbiType == DBI::SQL_VARBINARY )
                || ( $dbiType == DBI::SQL_LONGVARBINARY )
                || ( $dbiType == DBI::SQL_BLOB ) ) { #--BLOB
                return $dbiType == DBI::SQL_BLOB;
        } else {
                return DBI::SQL_VARCHAR; #-- default: VARCHAR
        }
}
sub validateModifyPrebuiltTable {
        #-- Function parameters
        my ($dbh, $schemaname, $snapshotname, $sth_source, $sth_target) = @_;

        #-- Local variables
        my $sql;
        my $sth;
        #-- compare column names, types and position
        for ( my $i = 0 ; $i < $sth_source->{NUM_OF_FIELDS} ; $i++ ) {
                my $columnName = $sth_source->{NAME}->[$i];
                if (getUnifiedSqlType($sth_source->{TYPE}->[$i]) ne getUnifiedSqlType($sth_target->{TYPE}->[$i])) {
                        elog ERROR, "Types of column $columnName mismatch";
                }
                if ($sth_source->{PRECISION}->[$i] > $sth_target->{PRECISION}->[$i]) {
                        elog ERROR, "Precision of column $columnName on source exceeds precision on target";
                }
                if ($sth_source->{SCALE}->[$i] > $sth_target->{SCALE}->[$i]) {
                        elog ERROR, "Scale of column $columnName on source exceeds scale on target";
                }
                if ( $sth_source->{NULLABLE}->[$i] ne $sth_target->{NULLABLE}->[$i] ) {
                        my $n;
                        my $n1;
                        if (! $sth_source->{NULLABLE}->[$i]) {
                                $n = 'NOT NULLABLE';
                                $n1 = 'NULLABLE';
                        } else {
                                $n = 'NULLABLE';
                                $n1 = 'NOT NULLABLE';
                        }
                        elog WARNING, "Column $columnName is $n on source but is $n1 on target";
                }
        }
        elog NOTICE, "Prebuilt table matches source query." if $main::DEBUG==1;

        #-- tests if the $pbt column exists
        $sql = <<SQL;
SELECT pbt\$
FROM $schemaname.$snapshotname
WHERE 1=0
SQL
        $sth = $dbh->prepare($sql);
        $sth->execute();
        my $err = $sth->err;
        $sth->finish();
        $dbh->commit;
        if ($err) {
                #-- column $pbt does not exist
                $sql = <<SQL;
ALTER TABLE $schemaname.$snapshotname
ADD COLUMN pbt\$ NUMERIC default 0
SQL
                $dbh->do($sql);
                if ($dbh->err) {
                        elog ERROR, "Could not add the 'pbt\$' column to $schemaname.$snapshotname. SQL=$sql. [".$dbh->errstr.']';
                }
        } else {
                elog WARNING, "Column \$pbt already exists on $schemaname.$snapshotname";
        }
        $sql = <<SQL;
UPDATE $schemaname.$snapshotname
SET pbt\$=0
WHERE pbt\$ IS NULL
SQL
        $dbh->do($sql);
        if ($dbh->err) {
                elog ERROR, "Could not set 'pbt\$' column on existing rows to 0 on $schemaname.$snapshotname. SQL=$sql. [".$dbh->errstr.']';
        }
        $dbh->commit;

        $sql = <<SQL;
ALTER TABLE $schemaname.$snapshotname
ALTER COLUMN pbt\$ SET NOT NULL
SQL
        $dbh->do($sql);
        if ($dbh->err) {
                elog ERROR, "Could not modify the 'pbt\$' column to NOT NULL on $schemaname.$snapshotname. SQL=$sql. [".$dbh->errstr.']';
        }

        $sql = <<SQL;
CREATE INDEX ${snapshotname}_pbt\$_ix
ON $schemaname.$snapshotname (pbt\$)
SQL
        $dbh->do($sql);
        if ($dbh->err) {
                elog WARNING, "Could not create index on 'pbt\$' column on $schemaname.$snapshotname. SQL=$sql. [".$dbh->errstr.']';
        }
        $dbh->commit;

        $sql = <<SQL;
CREATE OR REPLACE FUNCTION $schemaname.${snapshotname}_pbt\$_trgfn()
RETURNS trigger AS
\$BODYFN\$
BEGIN
        IF (TG_OP = 'UPDATE') THEN
                IF ((OLD.pbt\$ <> 0) OR (NEW.pbt\$ <> 0)) THEN
                        RAISE EXCEPTION 'Invalid operation on snapshot-based data!';
                END IF;
        ELSIF (TG_OP = 'DELETE') THEN
                IF (OLD.pbt\$ <> 0) THEN
                        RAISE EXCEPTION 'Invalid operation on snapshot-based data!';
                END  IF;
        ELSIF (TG_OP = 'INSERT') THEN
                IF ((NEW.pbt\$ IS NOT NULL) AND (NEW.pbt\$ <> 0)) THEN
                        RAISE EXCEPTION 'You cannot insert snapshot-based data manually!';
                END IF;
        END IF;
        RETURN NULL;
END;
\$BODYFN\$
LANGUAGE 'plpgsql' VOLATILE;
SQL
       
        elog NOTICE, $sql if $main::DEBUG==1;
       
        $dbh->do($sql);
        if ($dbh->err) {
                elog ERROR, "Could not create the prebuilt table trigger function for table '$schemaname.$snapshotname'. ERR='".$dbh->errstr."' SQL='$sql'";
        }
       
        #-- Create TRIGGER on the prebuilt table
        elog NOTICE, "Creating trigger ${snapshotname}_pbt\$_trg on $schemaname.${snapshotname}" if $main::DEBUG==1;
        $sql = <<SQL;
CREATE TRIGGER ${snapshotname}_pbt\$_trg AFTER INSERT OR UPDATE OR DELETE
ON $schemaname.${snapshotname} FOR EACH ROW
EXECUTE PROCEDURE $schemaname.${snapshotname}_pbt\$_trgfn ()
SQL
        $dbh->do($sql);
        if ($dbh->err) {
                elog WARNING, "Could not create the prebuilt table trigger on table '$schemaname.$snapshotname'. ERR='".$dbh->errstr."' SQL='$sql'";
        }
}

$BODY$
  LANGUAGE 'plperlu' VOLATILE;
ALTER FUNCTION public.create_snapshot(schemaname text, snapshotname text, query text, dblink text, kind text, pbt_table text) OWNER TO postgres;
COMMENT ON FUNCTION public.create_snapshot(schemaname text, snapshotname text, query text, dblink text, kind text, pbt_table text) IS $$
This function is part of PostgreSQL::Snapshots project.
This is the function that creates a snapshot. It only creates the snapshot placeholder with the same structure as the query result. The snapshot needs to be refresh to become filled.
$$;
--
-- ATTENTION: This is a free software.
--            View the LICENSE.txt file for license information
--

------------------------------------------------------------------------------
-- FUNCTION: public.drop_snapshot
--
-- Removes a SNAPSHOT previously created with create_snapshot
------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION public.drop_snapshot(schemaname text, snapshotname text)
  RETURNS bool AS
$BODY$
use strict;
use DBI;
use constant TRUE => 1;
use constant FALSE => "";
#-- Function parameters
my ($schemaname, $snapshotname) = @_;
$schemaname = lc($schemaname);
$snapshotname = lc($snapshotname);
#-- Set this to 1 for debugging messages
$main::DEBUG=0;
#-- Localhost superuser connection
my $dbh_local = DBI->connect('dbi:Pg:dbname=labpeTeste', 'postgres', undef, {AutoCommit => 0});
my $dbh_remote;
my $sql;
my $sth;
my $row;
my $masterschema;
my $mastername;
my $snapshot;
my $dblink;
my $attr_href;
my $affected;
my $total;

if (! snapshotExists($dbh_local, $schemaname, $snapshotname)) {
        elog ERROR, "Snapshot '$schemaname.$snapshotname' does not exist";
}

my $snapshot = getSnapshot($dbh_local, $schemaname, $snapshotname);
if ("$snapshot->{'dblinkid'}" eq '') {
        #-- refresh local
        $dbh_remote = $dbh_local;
} else {
        $dblink = getDblinkById($dbh_local, $snapshot->{'dblinkid'});
        if ($dblink) {
                #--create the remote database connection
                $attr_href = eval($dblink->{'attributes'});
                $dbh_remote = dbiGetConnection(
                        $dblink->{'datasource'}
                        , $dblink->{'username'}
                        , $dblink->{'password'}
                        , $attr_href
                        );
        } else {
                $dbh_remote = $dbh_local;
        }
}

($masterschema, $mastername) = retrieveMasterForSnapshot($dbh_remote, $snapshot);
if (snapshotLogExists($dbh_remote, $masterschema, $mastername)) {
        call_driver_function($dbh_remote, 'snapshot_do', 'UNREGISTER', $masterschema, $mastername, $snapshot->{'snapid'});
}
if ("$snapshot->{'dblinkid'}" ne '') {
        $dbh_remote->commit;
        $dbh_remote->disconnect;
}
#-- Then delete the SNAPSHOT entry in public.pg_snapshots
$sql = <<SQL;
DELETE FROM public.pg_snapshots
WHERE schemaname=?
        AND snapshotname=?
SQL
$sth = $dbh_local->prepare($sql);
$sth->execute(($schemaname, $snapshotname));
if (! $sth->err) {
        elog NOTICE, "Snapshot entry removed" if $main::DEBUG==1;
} else {
        elog ERROR, "Could not remove Snapshot entry '$schemaname.$snapshotname'. $sth->{errstr}";
}

if ("$snapshot->{'pbt_table'}" ne '') {
        my $pbt_table = $snapshot->{'pbt_table'};
        my $row;
        my $sth;

        setTriggerStatus($dbh_local, $snapshot->{'schemaname'}, $snapshot->{'pbt_table'}, "$snapshot->{'pbt_table'}_pbt\$_trg", FALSE);

        #-- Remove all rows based on this snapshot
        $sql = <<SQL;
DELETE FROM $schemaname.${pbt_table}
WHERE pbt\$ = ?
SQL
        $sth = $dbh_local->prepare($sql);
        $affected = $sth->execute(($snapshot->{'snapid'}));
        if ($sth->err) {
                elog ERROR, "Could not delete snapshot based rows from prebuilt table. SQL=$sql.".$sth->errstr;
        }
        $dbh_local->commit;
        elog NOTICE, "Prebuilt table snapshot rows deleted: $affected" if $main::DEBUG==1;
       
        #-- Find out whether we are the last snapshot using this prebuilt table
        $sql = <<SQL;
SELECT count(*) as total
FROM public.pg_snapshots
WHERE schemaname=?
        AND pbt_table=?
SQL
        my ($total) = sqlLookup($dbh_local, $sql, ($schemaname, $pbt_table));
        if ($total eq 0) {
                elog NOTICE, "We are the last snapshot on this prebuilt table!" if $main::DEBUG==1;
                #-- Drop pbt$ column index
                $sql = <<SQL;
DROP INDEX ${pbt_table}_pbt\$_ix
SQL
                $dbh_local->do($sql);
                if ($dbh_local->err) {
                        elog WARNING, "Could not drop the prebuilt table pbt\$ column index on table '$schemaname.$pbt_table'. ERR='".$dbh_local->errstr."' SQL='$sql'";
                }
       
                elog NOTICE, "Prebuilt table pbt\$ column index dropped." if $main::DEBUG==1;

                #-- Drop pbt$ column on Prebuilt table
                $sql = <<SQL;
ALTER TABLE $schemaname.${pbt_table}
DROP COLUMN pbt\$
SQL
                $dbh_local->do($sql);
                if ($dbh_local->err) {
                        elog ERROR, "Could not drop the prebuilt table pbt\$ column on table '$schemaname.$pbt_table'. ERR='".$dbh_local->errstr."' SQL='$sql'";
                }
       
                elog NOTICE, "Prebuilt table pbt\$ column dropped." if $main::DEBUG==1;

                #-- Drop triggers on Prebuilt table
                $sql = <<SQL;
DROP TRIGGER ${pbt_table}_pbt\$_trg ON $schemaname.${pbt_table}
SQL
                $dbh_local->do($sql);
                if ($dbh_local->err) {
                        elog ERROR, "Could not drop the prebuilt table trigger on table '$schemaname.$pbt_table'. ERR='".$dbh_local->errstr."' SQL='$sql'";
                }
       
                elog NOTICE, "Prebuilt table trigger dropped." if $main::DEBUG==1;
       
                #-- Drop trigger's function
                $sql = <<SQL;
DROP FUNCTION $schemaname.${pbt_table}_pbt\$_trgfn();
SQL
                $dbh_local->do($sql);
                if ($dbh_local->err) {
                        elog ERROR, "Could not drop the prebuilt table trigger function '$schemaname.${pbt_table}_trgfn()'. ERR='".$dbh_local->errstr."' SQL='$sql'";
                }
       
                elog NOTICE, "Prebuilt table trigger function dropped." if $main::DEBUG==1;
        } else {
                setTriggerStatus($dbh_local, $snapshot->{'schemaname'}, $snapshot->{'pbt_table'}, "$snapshot->{'pbt_table'}_pbt\$_trg", TRUE);
        }
} else {
        #-- Actually drop the SNAPSHOT placeholder
        $sql = <<SQL;
DROP TABLE $schemaname.$snapshotname
SQL
        $sth = spi_exec_query($sql);
        if ($sth->{status} eq 'SPI_OK_UTILITY') {
                elog NOTICE, "Snapshot dropped" if $main::DEBUG==1;
        } else {
                elog ERROR, "Could not drop Snapshot '$schemaname.$snapshotname'. $sth->{status}";
        }
}

$dbh_local->commit;
$dbh_local->disconnect;
#-- All done. Let's return TRUE
return TRUE;

#--
#-- SUB: sqlLookup
#-- Returns a one row result from a SQL query
#--
sub sqlLookup {
        my ($dbh, $sql, @params) = @_;
        return sqlLookup2 ($dbh, $sql, \@params, TRUE);
}
sub sqlLookupSoft {
        my ($dbh, $sql, @params) = @_;
        return sqlLookup2 ($dbh, $sql, \@params, FALSE);
}

sub sqlLookup2 {
        my ($dbh, $sql, $params, $errorsAreFatal) = @_;
        my $sth;
        my @row;

        elog NOTICE, "Lookup at SQL:$sql with ".((@$params ne 0) ? join(',', @$params) : '<none>') if $main::DEBUG==1;

        $sth = $dbh->prepare($sql);
        if (@$params ne 0) {
                $sth->execute(@$params);
        } else {
                $sth->execute();
        }
        if ($sth->err) {
                if ($errorsAreFatal) {
                        elog ERROR, "Fatal error! SQL=$sql ERROR=".$sth->errstr;
                }
        }
        my @row = $sth->fetchrow_array;
        $sth->finish;
        return @row;
}
#--
#-- SUB: snapshotExists
#-- Returns whether a snapshot exists or not
#--
sub snapshotExists {
        #-- Function parameters
        my ($dbh, $schemaname, $snapshotname) = @_;

        # local variables
        my $sql;
        my $sth;
        my $row;

        $sql = <<SQL;
SELECT count(*) as total
FROM public.pg_snapshots
WHERE schemaname=?
        and snapshotname=?
SQL
        $sth = $dbh->prepare($sql);
        $sth->execute(($schemaname, $snapshotname));
        if ($sth->err) {
                elog ERROR, "Could not find public.pg_snapshots !!!";
        }
        $row = $sth->fetchrow_hashref;
        return ($row->{'total'} ne 0);
}
#--
#-- SUB: getSnapshot
#-- Returns the named SNAPSHOT
#--
sub getSnapshot {
        #-- Function parameters
        my ($dbh, $schemaname, $snapshotname) = @_;

        #-- Local variables
        my $sql;
        my $sth;

        #-- Get snapshot info
        $sql = 'SELECT * FROM public.pg_snapshots WHERE schemaname=? AND snapshotname=?';
        $sth = $dbh->prepare($sql);
        $sth->execute(($schemaname, $snapshotname));
        return $sth->fetchrow_hashref;
}
#--
#-- SUB: getDblinkById
#-- Returns a DBLINK by its ID
#--
sub getDblinkById {
        #-- Function parameters
        my ($dbh, $dblinkid) = @_;

        #-- Local variables
        my $sql;
        my $sth;

        #--get dblink info
        $sql = 'SELECT * FROM public.pg_dblinks WHERE dblinkid=?';
        $sth = $dbh->prepare($sql);
        $sth->execute(($dblinkid));
        if ($sth->err) {
                elog ERROR, "DBLINK '$dblinkid' does not exist!";
        }
        return $sth->fetchrow_hashref;
}
#--
#-- SUB: dbiGetConnection
#-- Connects to a DBI datasource and returns a connection handle
#--
sub dbiGetConnection {
        my ($datasource, $username, $password, $attributes) = @_;
       
        # local variables
        my $dbh;

        $dbh = DBI->connect(
                $datasource
                , $username
                , $password
                , $attributes
                );
        if ($DBI::errstr) {
                elog ERROR, <<ERR;
Could not connect to database
data source: $datasource
user: $username
password: $password
dbh attributes:
$attributes

$DBI::errstr
ERR
        }
        return $dbh;
}
#--
#-- SUB: retrieveMasterForSnapshot
#-- Retrieve the MASTER tables for a SNAPSHOT
#--
sub retrieveMasterForSnapshot {
        #-- Function parameters
        my ($dbh, $snapshot) = @_;

        #-- Local variables
        my $masterschema;
        my $mastername;
        my $sql;
        my $sth;
        my $relatedCount;
        my $tablename;
        my @related=();
        my @tables=();

        $tablename = $snapshot->{'query'};
        @related = $tablename =~ m/(from|join) +((([0-9a-z_\$]+|".+")\.)?([0-9a-z_\$]+|".+")( *, *(([0-9a-z_\$]+|".+")\.)?([0-9a-z_\$]+|".+"))*)/gi;

        for (my $i = 1; $i < @related; $i += 9) {
                push(@tables, @related[$i]);
        }

        if (@tables eq 0) {
                elog ERROR, 'Origin TABLES not found !';
        }

        if (@tables eq 1) {
                my $tablename = @tables[0];
                elog NOTICE, "TABLE=[$tablename]" if $main::DEBUG==1;
                ($masterschema, $mastername) = split(/\./, $tablename);
                elog NOTICE, "MASTERSCHEMA=[$masterschema] MASTERNAME=[$mastername]" if $main::DEBUG==1;
                if ($mastername eq undef) {
                        #-- No Schema
                        elog NOTICE, 'Sorry, due to the complexity of your query, I could not guess the Schema name of your tables, so I will try a COMPLETE REFRESH instead.';
                        $mastername = $masterschema;
                        $snapshot->{'kind'} = 'C';
                        $masterschema = undef;
                       
                }
                return ($masterschema, $mastername);
        }

        if (($snapshot->{'kind'} eq 'F') || ($snapshot->{'kind'} eq 'R')) {
                elog NOTICE, "Refresh FAST only work with one-table only queries. Falling back to COMPLETE REFRESH. TABLES=[" . join(',', @tables) . ']';
                $snapshot->{'kind'} = 'C';
        }
        return (undef, undef);
}
#--
#-- SUB: snapshotLogExists
#-- Returns whether a master object has a SNAPSHOT LOG
#--
sub snapshotLogExists {
        #-- Function parameters
        my ($dbh, $masterschema, $mastername) = @_;

        my $result = call_driver_function($dbh, 'snapshotlog_exists', undef, $masterschema, $mastername, undef);
        return ($result eq 'T');
}
#--
#-- SUB: call_driver_function
#-- Calls a driver function (native) on the database pointed by the database handle ($dbh)
#--
sub call_driver_function {
        my ($dbh, $function_name, $operation, $masterschema, $mastername, $additional) = @_;
        $masterschema = uc($masterschema);
        $mastername = uc($mastername);

        elog NOTICE, "Calling DRIVER function $function_name ($operation, $masterschema, $mastername, $additional)" if $main::DEBUG==1;
        if ($function_name eq 'snapshot_do') {
                dbi_execute_stored_procedure($dbh, 'snapshot_do', ('123456', $operation, $masterschema, $mastername, $additional));
        } elsif ($function_name eq 'snapshotlog_exists') {
                return dbi_call_function($dbh, 'snapshotlog_exists', ($masterschema, $mastername));
        } elsif ($function_name eq 'last_log_refresh') {
                return dbi_call_function($dbh, 'last_log_refresh' , ($masterschema, $mastername, $additional));
        } elsif ($function_name eq 'snapshotlog_columns') {
                return dbi_call_function($dbh, 'snapshotlog_columns', ($masterschema, $mastername));
        } elsif ($function_name eq 'snapshotlog_name') {
                return dbi_call_function($dbh, 'snapshotlog_name', ($masterschema, $mastername));
        } elsif ($function_name eq 'count_log_modified_rows') {
                return dbi_call_function($dbh, 'count_log_modified_rows', ($masterschema, $mastername, $additional));
        } elsif ($function_name eq 'snapshotlog_ud_filter') {
                return dbi_call_function($dbh, 'snapshotlog_ud_filter', ($additional));
        } elsif ($function_name eq 'snapshotlog_iu_filter') {
                return dbi_call_function($dbh, 'snapshotlog_iu_filter', ($additional));
        } else {
                elog ERROR, "Could not call DRIVER function: $function_name";
        }
        $dbh->commit;
}

sub dbi_execute_stored_procedure {
        use DBI::Const::GetInfoType;
        my ($dbh, $procname, @parameters) = @_;
        my $dbname = lc($dbh->get_info( $GetInfoType{SQL_DBMS_NAME} ));
        my $sql;
        my $sth;
        my $qmarks = '?,' x @parameters;
        chop $qmarks;

        if ($dbname eq 'oracle') {
                $sql = "BEGIN $procname($qmarks); END;";
        } elsif ($dbname eq 'postgresql') {
                $sql = "SELECT $procname($qmarks)";
        } else {
                $sql = "SELECT $procname($qmarks)";
        }
        $sth = $dbh->prepare($sql);
        if ($dbh->err) {
                elog ERROR, $dbh->errstr."' SQL='$sql'";
        }
        $sth->execute(@parameters);
        if ($sth->err) {
                elog ERROR, $sth->errstr."' SQL='$sql'";
        }
}

sub dbi_call_function {
        use DBI::Const::GetInfoType;
        my ($dbh, $funcname, @parameters) = @_;
        my $dbname = lc($dbh->get_info( $GetInfoType{SQL_DBMS_NAME} ));
        my $sql;
        my $sth;
        my @row;
        my $qmarks = '?,' x @parameters;
        chop $qmarks;

        if ($dbname eq 'oracle') {
                $sql = "SELECT $funcname($qmarks) FROM DUAL";
        } elsif ($dbname eq 'postgresql') {
                $sql = "SELECT $funcname($qmarks)";
        } else {
                $sql = "SELECT $funcname($qmarks)";
        }
        $sth = $dbh->prepare($sql);
        if ($dbh->err) {
                elog ERROR, $dbh->errstr."' SQL='$sql'";
        }
        $sth->execute(@parameters);
        if ($sth->err) {
                elog ERROR, $sth->errstr."' SQL='$sql'";
        }
        @row = $sth->fetchrow_array;
        my $result = @row[0];
        return $result;
}
#--
#-- SUB: setTriggerStatus
#-- Enable/Disable a trigger
#--
sub setTriggerStatus {
        #-- Function parameters
        my ($dbh, $schemaname, $tablename, $triggername, $status) = @_;

        #-- Local variables
        my $sql;
        my $sth;
        my $tgenabled = ($status == TRUE) ? 'TRUE' : 'FALSE';

        $sql = <<SQL;
SELECT c.oid
FROM pg_class c
        LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname=?
        AND c.relname=?
SQL
        my ($tableid) = sqlLookup($dbh, $sql, ($schemaname, $tablename));
        if ("$tableid" eq "") {
                elog ERROR, "Could not retrieve TABLEID for table $schemaname.$tablename";
        }

        elog NOTICE, "Disabling trigger for table ID=$tableid NAME=$triggername" if $main::DEBUG==1;
        $sql = <<SQL;
UPDATE pg_trigger
SET tgenabled = $tgenabled
WHERE tgrelid=?
        AND tgname=?
SQL
        elog NOTICE, $sql if $main::DEBUG==1;
        $sth = $dbh->prepare($sql);
        $sth->execute(($tableid, $triggername));

        #-- Dummy update to refresh new trigger status
        if ($sth->err) {
                elog ERROR, $sth->errstr;
        }
        $sql = <<SQL;
UPDATE pg_class
SET relname=relname
WHERE oid=?
SQL
        $sth = $dbh->prepare($sql);
        $sth->execute(($tableid));
        if ($sth->err) {
                elog ERROR, $sth->errstr;
        }
}

$BODY$
  LANGUAGE 'plperlu' VOLATILE;
ALTER FUNCTION public.drop_snapshot(schemaname text, snapshotname text) OWNER TO postgres;
COMMENT ON FUNCTION public.drop_snapshot(schemaname text, snapshotname text) IS $$
This function is part of PostgreSQL::Snapshots project.
This is the function that removes a snapshot.
$$;
--
-- ATTENTION: This is a free software.
--            View the LICENSE.txt file for license information
--

------------------------------------------------------------------------------
-- FUNCTION: public.refresh_snapshot
--
-- Refreshes(Fills) a previously created SNAPSHOT
-- It can refresh using the FORCE, FAST or COMPLETE method
------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION public.refresh_snapshot(schemaname text, snapshotname text)
  RETURNS bool AS
$BODY$
use strict;
use DBI;
use constant TRUE => 1;
use constant FALSE => "";
#-- Function parameters
my ($schemaname, $snapshotname) = @_;
$schemaname = lc($schemaname);
$snapshotname = lc($snapshotname);
#-- Set this to 1 for debugging messages
$main::DEBUG=0;
#-- Localhost superuser connection
my $dbh_local = DBI->connect('dbi:Pg:dbname=labpeTeste', 'postgres', undef, {AutoCommit => 0});
#-- Remote database connection
my $dbh;
#-- Start time of this function
my $start_time = time();
my $sql = '';
my $rs;
my $row;
#-- Number of records refreshed
my $recs;

if (! snapshotExists($dbh_local, $schemaname, $snapshotname)) {
        elog ERROR, "Snapshot '$schemaname.$snapshotname' does not exist";
}

my $snapshot = getSnapshot($dbh_local, $schemaname, $snapshotname);
my $snapname = ("$snapshot->{'pbt_table'}" eq '') ? $snapshotname : $snapshot->{'pbt_table'};

#-- Test for object's privileges
my ($hasInsert, $hasDelete) = testObjectPrivileges($schemaname,$snapname,('insert', 'delete'));
if (! $hasInsert) {
        elog ERROR, "You don't have INSERT privilege on '$schemaname.$snapshotname' SNAPSHOT";
}
if (! $hasDelete) {
        elog ERROR, "You don't have DELETE privilege on '$schemaname.$snapshotname' SNAPSHOT";
}

if ("$snapshot->{'dblinkid'}" eq '') {
        #-- refresh local
        $recs = refreshSnapshot($dbh_local, $snapshot, undef);
} else {
        #-- refresh remote
        $recs = refreshSnapshot($dbh_local, $snapshot, getDblinkById($dbh_local, $snapshot->{'dblinkid'}));
}

#-- Compute the elapsed time
my $stop_time = time();
my $secs = $stop_time - $start_time;
elog NOTICE, "Refreshed $recs records in $secs seconds.";
$sql = "UPDATE public.pg_snapshots SET elapsedtime=? WHERE schemaname=? and snapshotname=?";
$rs=$dbh_local->prepare($sql);
$rs->execute(($secs, $schemaname, $snapshotname));
if ($rs->err) {
        elog NOTICE, "Could not update snapshot '$snapshot->{schemaname}.$snapshot->{snapshotname}' information. Error:" . $rs->errstr;
}
$dbh_local->commit;
$dbh_local->disconnect;

#-- Renew internal statistics about the refreshed object
$dbh_local = DBI->connect('dbi:Pg:dbname=labpeTeste', 'postgres', undef, {AutoCommit => 1});

if (! vacuum($dbh_local, $snapshot->{schemaname}, $snapname)) {
        elog NOTICE, "Could not vacuum snapshot '$schemaname.$snapname':" . $dbh_local->errstr;
}

$dbh_local->commit;
$dbh_local->disconnect;

#-- All done. Let's return TRUE
return TRUE;

#--
#-- SUB: sqlLookup
#-- Returns a one row result from a SQL query
#--
sub sqlLookup {
        my ($dbh, $sql, @params) = @_;
        return sqlLookup2 ($dbh, $sql, \@params, TRUE);
}
sub sqlLookupSoft {
        my ($dbh, $sql, @params) = @_;
        return sqlLookup2 ($dbh, $sql, \@params, FALSE);
}

sub sqlLookup2 {
        my ($dbh, $sql, $params, $errorsAreFatal) = @_;
        my $sth;
        my @row;

        elog NOTICE, "Lookup at SQL:$sql with ".((@$params ne 0) ? join(',', @$params) : '<none>') if $main::DEBUG==1;

        $sth = $dbh->prepare($sql);
        if (@$params ne 0) {
                $sth->execute(@$params);
        } else {
                $sth->execute();
        }
        if ($sth->err) {
                if ($errorsAreFatal) {
                        elog ERROR, "Fatal error! SQL=$sql ERROR=".$sth->errstr;
                }
        }
        my @row = $sth->fetchrow_array;
        $sth->finish;
        return @row;
}
#--
#-- SUB: snapshotExists
#-- Returns whether a snapshot exists or not
#--
sub snapshotExists {
        #-- Function parameters
        my ($dbh, $schemaname, $snapshotname) = @_;

        # local variables
        my $sql;
        my $sth;
        my $row;

        $sql = <<SQL;
SELECT count(*) as total
FROM public.pg_snapshots
WHERE schemaname=?
        and snapshotname=?
SQL
        $sth = $dbh->prepare($sql);
        $sth->execute(($schemaname, $snapshotname));
        if ($sth->err) {
                elog ERROR, "Could not find public.pg_snapshots !!!";
        }
        $row = $sth->fetchrow_hashref;
        return ($row->{'total'} ne 0);
}
#--
#-- SUB: testObjectPrivileges
#-- Tests the passed privileges of the current user against the target databse object and returns a result array
#--
sub testObjectPrivileges {
        #-- Function parameters
        my ($schemaname,$snapshotname,@privileges) = @_;

        #-- Local variables
        my $sql;
        my $sth;
        my @result=();

        #-- Create the query
        $sql = 'SELECT ';
        for (my $i=0; $i < @privileges; ++$i) {
                my $privilege = @privileges[$i];
                $sql = "$sql has_table_privilege('$schemaname.$snapshotname', '$privilege') as has_$privilege,";
        }
        chop $sql;
        elog NOTICE, $sql if $main::DEBUG==1;
        $sth = spi_exec_query($sql);
        for (my $i=0; $i < @privileges; ++$i) {
                my $privilege = @privileges[$i];
                @result[$i] = $sth->{rows}[0]->{"has_${privilege}"} eq 't';
        }
        elog NOTICE, 'PRIVILEGES:('.join(',', @privileges).')=('.join(',', @result).')' if $main::DEBUG==1;
        return @result;
}
#--
#-- SUB: getSnapshot
#-- Returns the named SNAPSHOT
#--
sub getSnapshot {
        #-- Function parameters
        my ($dbh, $schemaname, $snapshotname) = @_;

        #-- Local variables
        my $sql;
        my $sth;

        #-- Get snapshot info
        $sql = 'SELECT * FROM public.pg_snapshots WHERE schemaname=? AND snapshotname=?';
        $sth = $dbh->prepare($sql);
        $sth->execute(($schemaname, $snapshotname));
        return $sth->fetchrow_hashref;
}
#--
#-- SUB: refreshSnapshot
#-- Refresh a SNAPSHOT with a database query result
#--

#-- Perl trim function to remove whitespace from the start and end of the string
sub trim($)
{
        my $string = shift;
        $string =~ s/^\s+//;
        $string =~ s/\s+$//;
        return $string;
}

#-- Returns the current date/time
sub getNow {
        my $now = trim(`date "+%Y-%m-%d %H:%M:%S"`);
        return $now;
}

sub refreshSnapshot {
        #-- Function parameters
        my ($dbh_local, $snapshot, $dblink) = @_;

        #-- Local variables
        my $recs = 0;
        my $attr_href;
        my $dbh_remote;
        my $masterschema;
        my $mastername;
        my $kind;
        my $lastRefresh;
        my $refreshTime;

        if ($dblink) {
                #--create the remote database connection
                $attr_href = eval($dblink->{'attributes'});
                $dbh_remote = dbiGetConnection(
                        $dblink->{'datasource'}
                        , $dblink->{'username'}
                        , $dblink->{'password'}
                        , $attr_href
                        );
        } else {
                $dbh_remote = $dbh_local;
        }
        $dbh_remote->{LongReadLen} = 64 * 1024; #--LOB <= 64Kb

        if ($snapshot->{'kind'} eq 'C') {
                $masterschema = undef;
                $mastername = undef;
                $kind = 'C';
                $lastRefresh = undef;
        } else {
                ($masterschema, $mastername, $kind, $lastRefresh) = prepareForRefresh($dbh_remote, $snapshot);
        }
        $refreshTime = getNow();

        if ($kind eq 'C') {
                #-- Refresh Complete
                $recs = performCompleteRefresh($dbh_local, $dbh_remote, $snapshot);
        } else {
                #-- Refresh FAST
                $recs = performFastRefresh($dbh_local, $dbh_remote, $snapshot, $masterschema, $mastername, $lastRefresh);
        }
        finalizeRefresh($dbh_remote, $masterschema, $mastername, $snapshot->{'snapid'}, $refreshTime);
        if (snapshotLogExists($dbh_remote, $masterschema, $mastername)) {
                elog NOTICE, 'Purging Log' if $main::DEBUG==1;
                purgeSnapshotLog($dbh_remote, $masterschema, $mastername);
        }
        if ($recs eq '0E0') {
                $recs = 0;
        }
        if ($dblink) {
                $dbh_remote->disconnect;
        }
        return $recs;
}
#--
#-- SUB: getDblinkById
#-- Returns a DBLINK by its ID
#--
sub getDblinkById {
        #-- Function parameters
        my ($dbh, $dblinkid) = @_;

        #-- Local variables
        my $sql;
        my $sth;

        #--get dblink info
        $sql = 'SELECT * FROM public.pg_dblinks WHERE dblinkid=?';
        $sth = $dbh->prepare($sql);
        $sth->execute(($dblinkid));
        if ($sth->err) {
                elog ERROR, "DBLINK '$dblinkid' does not exist!";
        }
        return $sth->fetchrow_hashref;
}
#--
#-- SUB: vacuum
#-- Performs a VACUUM ANALYZE on a target object
#--
sub vacuum {
        #-- Function parameters
        my ($dbh, $schemaname, $snapshotname) = @_;

        #-- Local variables
        my $sql;

        $sql = "VACUUM ANALYZE $schemaname.$snapshotname";
        $dbh->do($sql);
        return (! $dbh->err);
}

#--Dependencies
#--
#-- SUB: dbiGetConnection
#-- Connects to a DBI datasource and returns a connection handle
#--
sub dbiGetConnection {
        my ($datasource, $username, $password, $attributes) = @_;
       
        # local variables
        my $dbh;

        $dbh = DBI->connect(
                $datasource
                , $username
                , $password
                , $attributes
                );
        if ($DBI::errstr) {
                elog ERROR, <<ERR;
Could not connect to database
data source: $datasource
user: $username
password: $password
dbh attributes:
$attributes

$DBI::errstr
ERR
        }
        return $dbh;
}
#--
#-- SUB: prepareForRefresh
#-- Prepares the local SNAPSHOT for the refresh process, removing old rows, old logs, etc.
#--
sub prepareForRefresh {
        #-- Function parameters
        my ($dbh, $snapshot) = @_;

        #-- Local variables
        my $kind = 'C';
        my $lastRefresh = undef;
        my $snapid = $snapshot->{'snapid'};
        my ($masterschema, $mastername) = retrieveMasterForSnapshot($dbh, $snapshot);
        if ($masterschema ne undef) {
                if (snapshotLogExists($dbh, $masterschema, $mastername)) { #-- has snapshot log
                        call_driver_function($dbh, 'snapshot_do', 'REGISTER', $masterschema, $mastername, $snapid);
                        $lastRefresh = getLastSnapshotLogRefresh($dbh, $masterschema, $mastername, $snapid);
                        elog NOTICE, "Last refresh on $lastRefresh" if $main::DEBUG==1;
                        $kind = getRefreshMethod($dbh, $snapshot, $masterschema, $mastername, $lastRefresh);
                }
        }
        elog NOTICE, "Refresh method=$kind" if $main::DEBUG==1;
        return ($masterschema, $mastername, $kind, $lastRefresh);
}

#--
#-- SUB: performCompleteRefresh
#-- Performs a COMPLETE REFRESH on a snapshot
#--
sub performCompleteRefresh {
        #-- Function parameters
        my ($dbh_local, $dbh_remote, $snapshot) = @_;

        #-- Local variables
        my $sql;
        my $sth;
        my $sth_local;
        my $recs;
        my $errors;
        my $snapshotname;
        my $targetColumnList;

        #--Fetches the remote database query
        $sql = $snapshot->{'query'};
        elog NOTICE, "sql is \n$sql" if $main::DEBUG==1;

        $sth = $dbh_remote->prepare($sql);
        if ($dbh_remote->err) {
                elog ERROR, "Could not fetch remote query: $sql [".$dbh_remote->errstr."]";
        }
        $sth->execute;
        #-- Parameterized query placeholders (question marks)
        my $qms = '?,' x $sth->{NUM_OF_FIELDS};
        chop($qms);

        $snapshotname = ($snapshot->{'pbt_table'} eq '') ? $snapshot->{'snapshotname'} : $snapshot->{'pbt_table'};
        $targetColumnList = join(',', getColumnList($sth));
        if ($snapshot->{'pbt_table'} eq '') {
                #--Truncates (empty) the local SNAPSHOT
                $sql = <<SQL;
TRUNCATE $snapshot->{'schemaname'}.$snapshotname
SQL
                $dbh_local->do($sql);
                if (! $dbh_local->err) {
                        elog NOTICE, "Snapshot truncated" if $main::DEBUG==1;
                } else {
                        elog ERROR, "Could not truncate snapshot: $sql [".$dbh_local->errstr."]";
                }
        } else {
                setTriggerStatus($dbh_local, $snapshot->{'schemaname'}, $snapshotname, "${snapshotname}_pbt\$_trg", FALSE);
                $targetColumnList = "pbt\$,$targetColumnList";
                $qms = "$snapshot->{'snapid'},$qms";
                #-- Deletes all rows where pbt$ eq $snapid
                $sql = <<SQL;
DELETE FROM $snapshot->{'schemaname'}.$snapshotname
WHERE pbt\$ = ?
SQL
                my $sth_aux = $dbh_local->prepare($sql);
                $sth_aux->execute(($snapshot->{'snapid'}));
                if (! $sth_aux->err) {
                        elog NOTICE, "Snapshot cleaned." if $main::DEBUG==1;
                } else {
                        elog ERROR, "Could not clean snapshot: $sql [".$sth_aux->errstr."]";
                }
        }
        $dbh_local->commit;

        #-- Fill the SNAPSHOT with the remote database query results
       
                #-- Prepare the INSERT query that will be executed for each returned query
                $sql = <<SQL;
INSERT INTO $snapshot->{'schemaname'}.$snapshotname($targetColumnList)
VALUES ($qms)
SQL
                elog NOTICE, $sql if $main::DEBUG==1;
                $sth_local = $dbh_local->prepare($sql);

                #-- Set the type of target row parameters to the same type of source row columns
                my @types = getSthColumnTypes($sth);
                for (my $count=0; $count < $sth->{NUM_OF_FIELDS}; $count++) {
                        $sth_local->bind_param($count + 1, undef, @types[$count]);
                }

                #-- Loop through all fetched records and insert them one at a time into the SNAPSHOT
                $recs = $errors = 0;
                while(my @row = $sth->fetchrow_array) {
                        $sth_local->execute(@row);
                        if ($sth_local->err) {
                                if ($errors eq 0) {
                                        elog ERROR, $sth_local->errstr;
                                }
                                ++$errors;
                        }
                        ++$recs;
                        if (($recs % 1000) == 0) {
                                $dbh_local->commit;
                                elog NOTICE, "Commit. Record #$recs" if $main::DEBUG==1;
                        }
                }
                if ($sth->err) {
                        elog ERROR, $sth->errstr;
                }
                $dbh_local->commit;
                $sth_local->finish;

                $sth->finish;

                elog NOTICE, "All $recs records processed. $errors errors." if $main::DEBUG==1;
        if ($snapshot->{'pbt_table'} ne '') {
                setTriggerStatus($dbh_local, $snapshot->{'schemaname'}, $snapshotname, "${snapshotname}_pbt\$_trg", TRUE);
        }
        #-- Returns the number of records inserted
        return $recs-$errors;
}
#--
#-- SUB: performFastRefresh
#-- Performs a FAST REFRESH on a snapshot
#--
sub performFastRefresh {
        #-- Function parameters
        my ($dbh_local, $dbh_remote, $snapshot, $masterschema, $mastername, $lastRefresh) = @_;

        #-- Local variables
        my $sth_local;
        my $sth_remote;
        my $masterKeys = getSnapshotLogColumns($dbh_remote, $masterschema, $mastername);
        my $qms;
        my $recs;
        my $errors;
        my $sql;
        my $where;
        my @types;
        my ($logKeys, $arrLogKeys) = getLogKeys ($masterKeys);
        my @keys = ($masterKeys, $logKeys, @$arrLogKeys);
        my $snapshotLogName = getSnapshotLogName($dbh_remote, $masterschema, $mastername);
        my $snapshotname;
        my $targetColumnList;

        #-- Remove old records from the snapshot
        my $deleted = deleteOldSnapshotRecords($dbh_local, $dbh_remote, $snapshot, $masterschema, $mastername, $lastRefresh, @keys);

        my @arrMasterKeys = getSourceKeysFromQuery($snapshot->{'query'}, split(',', $masterKeys));
        my $filter = '';
        for (my $i = 0; $i < @arrMasterKeys; ++$i) {
                $filter .= ' AND source.'.@arrMasterKeys[$i].'=keys.'.@$arrLogKeys[$i];
        }
        $filter = substr($filter, 5);
        my $logFilter = getInsertedUpdatedLogRecordsFilter($dbh_remote, $lastRefresh);
        my $source = $snapshot->{'query'};
        $sql = <<SQL;
SELECT source.*
FROM ($source) source,
        (SELECT $logKeys
        FROM $masterschema.$snapshotLogName
        WHERE $logFilter) keys
WHERE $filter
SQL
        elog NOTICE, "Filter SQL is: $sql" if $main::DEBUG==1;
        elog NOTICE, "LastRefresh is: $lastRefresh" if $main::DEBUG==1;
        $sth_remote = $dbh_remote->prepare($sql);
        if ($dbh_remote->err) {
                elog ERROR, "Could not apply filter rules to original query: $sql [".$dbh_remote->errstr."]";
        }

        #-- Fill it with the result query
        $sth_remote->execute();
        if ($dbh_remote->err) {
                elog ERROR, "Could not retrieve records to insert: $sql [".$dbh_remote->errstr."]";
        }

        $qms = '?,' x $sth_remote->{NUM_OF_FIELDS};
        chop($qms);

        if ($snapshot->{'pbt_table'} ne '') {
                $qms = "$snapshot->{'snapid'},$qms";
        }

        $snapshotname = ($snapshot->{'pbt_table'} eq '') ? $snapshot->{'snapshotname'} : $snapshot->{'pbt_table'};
        $targetColumnList = join(',', getColumnList($sth_remote));

        if ($snapshot->{'pbt_table'} ne '') {
                $targetColumnList = "pbt\$,$targetColumnList";
        }
        $sql = <<SQL;
INSERT INTO $snapshot->{'schemaname'}.$snapshotname($targetColumnList)
VALUES ($qms)
SQL
        $sth_local = $dbh_local->prepare($sql);
        if ($dbh_local->err) {
                elog ERROR, "Could not add new records into snapshot: $sql [".$dbh_local->errstr."]";
        }

        @types = getSthColumnTypes($sth_remote);
        for (my $count=0; $count < $sth_remote->{NUM_OF_FIELDS}; $count++) {
                $sth_local->bind_param($count + 1, undef, @types[$count]);
        }

        elog NOTICE, 'Fetching records.' if $main::DEBUG==1;
        #-- Loop through all fetched records and insert them one at a time into the SNAPSHOT
        $recs = 0;
        $errors = 0;
        setTriggerStatus($dbh_local, $snapshot->{'schemaname'}, $snapshotname, "${snapshotname}_pbt\$_trg", FALSE);
        while(my @row = $sth_remote->fetchrow_array) {
                $sth_local->execute(@row);
                if ($sth_local->err) {
                        ++$errors;
                }
                ++$recs;
                if (($recs % 1000) == 0) {
                        $dbh_local->commit;
                        elog NOTICE, "Inserted. Record #$recs" if $main::DEBUG==1;
                }
        }
        setTriggerStatus($dbh_local, $snapshot->{'schemaname'}, $snapshotname, "${snapshotname}_pbt\$_trg", TRUE);
        if (! $sth_remote->err) {
                $recs = $recs - $errors;
                elog NOTICE, "Inserted $recs modified records" if $main::DEBUG==1;
                if ($errors > 0) {
                        elog ERROR, "$errors errors on insertion process" if $main::DEBUG==1;
                }
        } else {
                elog ERROR, "Could insert modified snapshot records: $sql [".$sth_remote->errstr."]";
        }

        $dbh_local->commit;
        $sth_local->finish;

        return $recs;
}

#--
#-- SUB: finalizeRefresh
#-- Finalize the refresh process updating control fields.
#--
sub finalizeRefresh {
        #-- Function parameters
        my ($dbh, $masterschema, $mastername, $snapid, $refreshTime) = @_;

        #-- Local variables
        if ($masterschema ne undef) {
                if (snapshotLogExists($dbh, $masterschema, $mastername)) { #-- has snapshot log
                        elog NOTICE, "Setting refresh time to $refreshTime" if $main::DEBUG==1;
                        updateLastSnapshotLogRefresh($dbh, $masterschema, $mastername, $refreshTime, $snapid);
                        updateNullSnaptimeRows($dbh, $masterschema, $mastername, $refreshTime);
                }
        }
}

#--
#-- SUB: purgeSnapshotLog
#-- Removes old SNAPSHOT LOG entries
#--
sub purgeSnapshotLog {
        #-- Function parameters
        my ($dbh, $masterschema, $mastername) = @_;

        call_driver_function($dbh, 'snapshot_do', 'PURGELOG', $masterschema, $mastername, undef);
}

#--
#-- SUB: call_driver_function
#-- Calls a driver function (native) on the database pointed by the database handle ($dbh)
#--
sub call_driver_function {
        my ($dbh, $function_name, $operation, $masterschema, $mastername, $additional) = @_;
        $masterschema = uc($masterschema);
        $mastername = uc($mastername);

        elog NOTICE, "Calling DRIVER function $function_name ($operation, $masterschema, $mastername, $additional)" if $main::DEBUG==1;
        if ($function_name eq 'snapshot_do') {
                dbi_execute_stored_procedure($dbh, 'snapshot_do', ('123456', $operation, $masterschema, $mastername, $additional));
        } elsif ($function_name eq 'snapshotlog_exists') {
                return dbi_call_function($dbh, 'snapshotlog_exists', ($masterschema, $mastername));
        } elsif ($function_name eq 'last_log_refresh') {
                return dbi_call_function($dbh, 'last_log_refresh' , ($masterschema, $mastername, $additional));
        } elsif ($function_name eq 'snapshotlog_columns') {
                return dbi_call_function($dbh, 'snapshotlog_columns', ($masterschema, $mastername));
        } elsif ($function_name eq 'snapshotlog_name') {
                return dbi_call_function($dbh, 'snapshotlog_name', ($masterschema, $mastername));
        } elsif ($function_name eq 'count_log_modified_rows') {
                return dbi_call_function($dbh, 'count_log_modified_rows', ($masterschema, $mastername, $additional));
        } elsif ($function_name eq 'snapshotlog_ud_filter') {
                return dbi_call_function($dbh, 'snapshotlog_ud_filter', ($additional));
        } elsif ($function_name eq 'snapshotlog_iu_filter') {
                return dbi_call_function($dbh, 'snapshotlog_iu_filter', ($additional));
        } else {
                elog ERROR, "Could not call DRIVER function: $function_name";
        }
        $dbh->commit;
}

sub dbi_execute_stored_procedure {
        use DBI::Const::GetInfoType;
        my ($dbh, $procname, @parameters) = @_;
        my $dbname = lc($dbh->get_info( $GetInfoType{SQL_DBMS_NAME} ));
        my $sql;
        my $sth;
        my $qmarks = '?,' x @parameters;
        chop $qmarks;

        if ($dbname eq 'oracle') {
                $sql = "BEGIN $procname($qmarks); END;";
        } elsif ($dbname eq 'postgresql') {
                $sql = "SELECT $procname($qmarks)";
        } else {
                $sql = "SELECT $procname($qmarks)";
        }
        $sth = $dbh->prepare($sql);
        if ($dbh->err) {
                elog ERROR, $dbh->errstr."' SQL='$sql'";
        }
        $sth->execute(@parameters);
        if ($sth->err) {
                elog ERROR, $sth->errstr."' SQL='$sql'";
        }
}

sub dbi_call_function {
        use DBI::Const::GetInfoType;
        my ($dbh, $funcname, @parameters) = @_;
        my $dbname = lc($dbh->get_info( $GetInfoType{SQL_DBMS_NAME} ));
        my $sql;
        my $sth;
        my @row;
        my $qmarks = '?,' x @parameters;
        chop $qmarks;

        if ($dbname eq 'oracle') {
                $sql = "SELECT $funcname($qmarks) FROM DUAL";
        } elsif ($dbname eq 'postgresql') {
                $sql = "SELECT $funcname($qmarks)";
        } else {
                $sql = "SELECT $funcname($qmarks)";
        }
        $sth = $dbh->prepare($sql);
        if ($dbh->err) {
                elog ERROR, $dbh->errstr."' SQL='$sql'";
        }
        $sth->execute(@parameters);
        if ($sth->err) {
                elog ERROR, $sth->errstr."' SQL='$sql'";
        }
        @row = $sth->fetchrow_array;
        my $result = @row[0];
        return $result;
}
#--
#-- SUB: setTriggerStatus
#-- Enable/Disable a trigger
#--
sub setTriggerStatus {
        #-- Function parameters
        my ($dbh, $schemaname, $tablename, $triggername, $status) = @_;

        #-- Local variables
        my $sql;
        my $sth;
        my $tgenabled = ($status == TRUE) ? 'TRUE' : 'FALSE';

        $sql = <<SQL;
SELECT c.oid
FROM pg_class c
        LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname=?
        AND c.relname=?
SQL
        my ($tableid) = sqlLookup($dbh, $sql, ($schemaname, $tablename));
        if ("$tableid" eq "") {
                elog ERROR, "Could not retrieve TABLEID for table $schemaname.$tablename";
        }

        elog NOTICE, "Disabling trigger for table ID=$tableid NAME=$triggername" if $main::DEBUG==1;
        $sql = <<SQL;
UPDATE pg_trigger
SET tgenabled = $tgenabled
WHERE tgrelid=?
        AND tgname=?
SQL
        elog NOTICE, $sql if $main::DEBUG==1;
        $sth = $dbh->prepare($sql);
        $sth->execute(($tableid, $triggername));

        #-- Dummy update to refresh new trigger status
        if ($sth->err) {
                elog ERROR, $sth->errstr;
        }
        $sql = <<SQL;
UPDATE pg_class
SET relname=relname
WHERE oid=?
SQL
        $sth = $dbh->prepare($sql);
        $sth->execute(($tableid));
        if ($sth->err) {
                elog ERROR, $sth->errstr;
        }
}
#--
#-- SUB: getColumnList
#-- Returns an array with all columns on a statement
#--
sub getColumnList {
        my ($sth) = @_;
        my @cols = ();

        for ( my $i = 0 ; $i < $sth->{NUM_OF_FIELDS} ; $i++ ) {
                push @cols, $sth->{NAME}->[$i];
        }
        return @cols;
}
#--
#-- getSourceKeysFromQuery
#--
sub getSourceKeysFromQuery {
        my ($query, @masterKeys) = @_;
        my @keys=();

        my @related = $query =~ m/ +([0-9a-z_\$]+|".+") +as +([0-9a-z_\$]+|".+")/gi;
        my $found;

        for (my $i = 0; $i < @masterKeys; ++$i) {
                $found = FALSE;
                for (my $j = 0; $j < @related; $j += 2) {
                        if (@masterKeys[$i] eq @related[$j]) {
                                push(@keys, @related[$j+1]);
                                $found = TRUE;
                                last;
                        }
                }
                if (! $found) {
                        push(@keys, @masterKeys[$i]);
                }
        }
        return @keys;
}


#--
#-- SUB: retrieveMasterForSnapshot
#-- Retrieve the MASTER tables for a SNAPSHOT
#--
sub retrieveMasterForSnapshot {
        #-- Function parameters
        my ($dbh, $snapshot) = @_;

        #-- Local variables
        my $masterschema;
        my $mastername;
        my $sql;
        my $sth;
        my $relatedCount;
        my $tablename;
        my @related=();
        my @tables=();

        $tablename = $snapshot->{'query'};
        @related = $tablename =~ m/(from|join) +((([0-9a-z_\$]+|".+")\.)?([0-9a-z_\$]+|".+")( *, *(([0-9a-z_\$]+|".+")\.)?([0-9a-z_\$]+|".+"))*)/gi;

        for (my $i = 1; $i < @related; $i += 9) {
                push(@tables, @related[$i]);
        }

        if (@tables eq 0) {
                elog ERROR, 'Origin TABLES not found !';
        }

        if (@tables eq 1) {
                my $tablename = @tables[0];
                elog NOTICE, "TABLE=[$tablename]" if $main::DEBUG==1;
                ($masterschema, $mastername) = split(/\./, $tablename);
                elog NOTICE, "MASTERSCHEMA=[$masterschema] MASTERNAME=[$mastername]" if $main::DEBUG==1;
                if ($mastername eq undef) {
                        #-- No Schema
                        elog NOTICE, 'Sorry, due to the complexity of your query, I could not guess the Schema name of your tables, so I will try a COMPLETE REFRESH instead.';
                        $mastername = $masterschema;
                        $snapshot->{'kind'} = 'C';
                        $masterschema = undef;
                       
                }
                return ($masterschema, $mastername);
        }

        if (($snapshot->{'kind'} eq 'F') || ($snapshot->{'kind'} eq 'R')) {
                elog NOTICE, "Refresh FAST only work with one-table only queries. Falling back to COMPLETE REFRESH. TABLES=[" . join(',', @tables) . ']';
                $snapshot->{'kind'} = 'C';
        }
        return (undef, undef);
}
#--
#-- SUB: snapshotLogExists
#-- Returns whether a master object has a SNAPSHOT LOG
#--
sub snapshotLogExists {
        #-- Function parameters
        my ($dbh, $masterschema, $mastername) = @_;

        my $result = call_driver_function($dbh, 'snapshotlog_exists', undef, $masterschema, $mastername, undef);
        return ($result eq 'T');
}
#--
#-- SUB: getLastSnapshotLogRefresh
#-- Returns the date/time of the snapshot's last refresh
#--
sub getLastSnapshotLogRefresh {
        #-- Function arguments
        my ($dbh, $masterschema, $mastername, $snapid) = @_;

        return call_driver_function($dbh, 'last_log_refresh', undef, $masterschema, $mastername, $snapid);
}
#--
#-- SUB: getRefreshMethod
#-- Returns the best refresh method supported by the SNAPSHOT refreshing process
#--
sub getRefreshMethod {
        #-- Function parameters
        my ($dbh, $snapshot, $masterschema, $mastername, $lastRefresh) = @_;

        #-- Local variables
        my $sql;
        my $sth;
        my $kind = $snapshot->{'kind'};
        my $row;

        if (($lastRefresh eq undef) || ($lastRefresh eq '1900-01-01 00:00:00')) { #-- It's the snapshot first refresh: always COMPLETE
                elog NOTICE, 'First Refresh detected.' if $main::DEBUG==1;
                $kind = 'C';
        } elsif (($kind eq 'R') or ($kind eq 'F')) { #-- The snapshot was created with REFRESH FORCE or REFRESH FAST
                #-- Test if LOG exists
                if (snapshotLogExists($dbh, $masterschema, $mastername)) {
                        #-- count the rows at LOG
                        my $logcount = countSnapshotLogModifiedRows($dbh, $masterschema, $mastername, $lastRefresh);
                        #-- count the rows at MASTER
                        if ("$snapshot->{'pbt_table'}" ne '') {
                                $sql = <<SQL;
SELECT count(*) as total
FROM $masterschema.$snapshot->{'pbt_table'}
WHERE pbt\$ = $snapshot->{'snapid'}
SQL
                        } else {
                                $sql = <<SQL;
SELECT count(*) as total
FROM $masterschema.$mastername
SQL
                        }
                        my ($mastercount) = sqlLookup($dbh, $sql, ());
                        elog NOTICE, "Number of modified records: LOG=$logcount MASTER=$mastercount" if $main::DEBUG==1;
                        #-- FAST only if count(LOG) le count(MASTER) / 4
                        if (($kind eq 'F') || ($logcount le $mastercount)) {
                                $kind = 'F';
                        } else {
                                elog NOTICE, "Huge number of modified records!" if $main::DEBUG==1;
                                $kind = 'C';
                        }
                } else {
                        elog NOTICE, 'Snapshot Log does not exist.' if $main::DEBUG==1;
                        $kind = 'C';
                }
                if (($snapshot->{'kind'} eq 'F') and ($kind ne 'F')) {
                        #-- Can't do REFRESH FAST
                        elog ERROR, 'Refresh FAST not supported. Did you set up a SNAPSHOT LOG correctly ?';
                }
        }
        return $kind;
}
#--
#-- SUB: updateLastSnapshotLogRefresh
#-- Updates the SNAPSHOT LOG catalog with the SNAPSHOT last refresh
#--
sub updateLastSnapshotLogRefresh {
        #-- Function parameters
        my ($dbh, $masterschema, $mastername, $refreshTime, $snapid) = @_;
       
        call_driver_function($dbh, 'snapshot_do', 'REFRESHED', $masterschema, $mastername, $refreshTime.'|'.$snapid);
}
#--
#-- SUB: updateNullSnaptimeRows
#-- Updates the SNAPSHOT LOG rows that were not viewed by anyone (snaptime column is NULL)
#--
sub updateNullSnaptimeRows {
        #-- Function parameters
        my ($dbh, $masterschema, $mastername, $refreshTime) = @_;

        call_driver_function($dbh, 'snapshot_do', 'UPDATE_NULL', $masterschema, $mastername, $refreshTime);
}
#--
#-- SUB: getInsertedUpdatedLogRecordsFilter
#-- Returns the filter for retrieving snapshot log's inserted or updated records
#--
sub getInsertedUpdatedLogRecordsFilter {
        my ($dbh, $lastRefresh) = @_;
        return call_driver_function($dbh, 'snapshotlog_iu_filter', undef, undef, undef, $lastRefresh);
}

#--
#-- SUB: countSnapshotLogModifiedRows
#-- Returns the number of modified rows on the snapshot log
#--
sub countSnapshotLogModifiedRows {
        #-- Function parameters
        my ($dbh, $masterschema, $mastername, $lastRefresh) = @_;
        return call_driver_function($dbh, 'count_log_modified_rows', undef, $masterschema, $mastername, $lastRefresh);
}
#--
#-- SUB: getSthColumnTypes
#-- Returns an array in which every index is the corresponding SQL data type for each column index of the STATEMENT parameter
#--
sub getSthColumnTypes {
        #-- Function parameters
        my ($sth) = @_;

        #-- Local variables
        my @types;

        for (my $i = 0; $i < $sth->{NUM_OF_FIELDS}; ++$i) {
                my $dbiType = $sth->{TYPE}->[$i];
                @types[$i] = map2PgType(getUnifiedSqlType($dbiType));
        }
        return @types;
}
#--
#-- SUB: map2PgType
#-- Maps a DBI type to a PostgreSQL native type or a SQL type supported by PostgreSQL
#--
sub map2PgType {
        use DBD::Pg;
        my ($dbiType) = @_;

        my $pgType;

        #--------------------------------
        #-- SQL datatype codes
        #-- SQL_GUID                         (-11)
        #-- SQL_WLONGVARCHAR                 (-10)
        #-- SQL_WVARCHAR                      (-9)
        #-- SQL_WCHAR                         (-8)
        #-- SQL_BIT                           (-7)
        #-- SQL_TINYINT                       (-6)
        #-- SQL_BIGINT                        (-5)
        #-- SQL_LONGVARBINARY                 (-4)
        #-- SQL_VARBINARY                     (-3)
        #-- SQL_BINARY                        (-2)
        #-- SQL_LONGVARCHAR                   (-1)
        #-- SQL_UNKNOWN_TYPE                    0
        #-- SQL_ALL_TYPES                       0
        #-- SQL_CHAR                            1
        #-- SQL_NUMERIC                         2
        #-- SQL_DECIMAL                         3
        #-- SQL_INTEGER                         4
        #-- SQL_SMALLINT                        5
        #-- SQL_FLOAT                           6
        #-- SQL_REAL                            7
        #-- SQL_DOUBLE                          8
        #-- SQL_DATETIME                        9
        #-- SQL_DATE                            9
        #-- SQL_INTERVAL                       10
        #-- SQL_TIME                           10
        #-- SQL_TIMESTAMP                      11
        #-- SQL_VARCHAR                        12
        #-- SQL_BOOLEAN                        16
        #-- SQL_UDT                            17
        #-- SQL_UDT_LOCATOR                    18
        #-- SQL_ROW                            19
        #-- SQL_REF                            20
        #-- SQL_BLOB                           30
        #-- SQL_BLOB_LOCATOR                   31
        #-- SQL_CLOB                           40
        #-- SQL_CLOB_LOCATOR                   41
        #-- SQL_ARRAY                          50
        #-- SQL_ARRAY_LOCATOR                  51
        #-- SQL_MULTISET                       55
        #-- SQL_MULTISET_LOCATOR               56
        #-- SQL_TYPE_DATE                      91
        #-- SQL_TYPE_TIME                      92
        #-- SQL_TYPE_TIMESTAMP                 93
        #-- SQL_TYPE_TIME_WITH_TIMEZONE        94
        #-- SQL_TYPE_TIMESTAMP_WITH_TIMEZONE   95
        #-- SQL_INTERVAL_YEAR                 101
        #-- SQL_INTERVAL_MONTH                102
        #-- SQL_INTERVAL_DAY                  103
        #-- SQL_INTERVAL_HOUR                 104
        #-- SQL_INTERVAL_MINUTE               105
        #-- SQL_INTERVAL_SECOND               106
        #-- SQL_INTERVAL_YEAR_TO_MONTH        107
        #-- SQL_INTERVAL_DAY_TO_HOUR          108
        #-- SQL_INTERVAL_DAY_TO_MINUTE        109
        #-- SQL_INTERVAL_DAY_TO_SECOND        110
        #-- SQL_INTERVAL_HOUR_TO_MINUTE       111
        #-- SQL_INTERVAL_HOUR_TO_SECOND       112
        #-- SQL_INTERVAL_MINUTE_TO_SECOND     113
        #--------------------------------

#--PG_BOOL PG_BYTEA PG_CHAR PG_INT8 PG_INT2 PG_INT4 PG_TEXT PG_OID
#--PG_FLOAT4 PG_FLOAT8 PG_ABSTIME PG_RELTIME PG_TINTERVAL PG_BPCHAR
#--PG_VARCHAR PG_DATE PG_TIME PG_DATETIME PG_TIMESPAN PG_TIMESTAMP
        if ( $dbiType == DBI::SQL_INTEGER ) {
                $pgType = { pg_type => DBD::Pg::PG_INT8 };
        } elsif ( $dbiType == DBI::SQL_SMALLINT ) {
                $pgType = { pg_type => DBD::Pg::PG_INT4 };
        } elsif ( $dbiType == DBI::SQL_TIMESTAMP ) {
                $pgType = DBI::SQL_TIMESTAMP;
        } elsif ( $dbiType == DBI::SQL_NUMERIC ) {
                $pgType = DBI::SQL_NUMERIC;
        } elsif ( $dbiType == DBI::SQL_BOOLEAN ) {
                $pgType = DBI::SQL_BOOLEAN;
        } elsif ( $dbiType == DBI::SQL_CHAR ) {
                $pgType = DBI::SQL_CHAR;
        } elsif ( $dbiType == DBI::SQL_VARCHAR ) {
                $pgType = { pg_type => DBD::Pg::PG_VARCHAR };
        } elsif ( $dbiType == DBI::SQL_CLOB ) {
                $pgType = { pg_type => DBD::Pg::PG_VARCHAR };
        } elsif ( $dbiType == DBI::SQL_DATE ) {
                $pgType = DBI::SQL_DATE;
        } elsif ( $dbiType == DBI::SQL_TIME ) {
                $pgType = DBI::SQL_TIME;
        } elsif ( $dbiType == DBI::SQL_INTERVAL ) {
                $pgType = DBI::SQL_INTERVAL;
        } elsif ( $dbiType == DBI::SQL_BLOB ) {
                $pgType = { pg_type => DBD::Pg::PG_BYTEA };
        } else {
                $pgType = { pg_type => DBD::Pg::PG_TEXT };
        }
        return $pgType;
}

#--
#-- SUB: getSnapshotLogColumns
#-- Returns the snapshot log's filter/pk/oid columns
#--
sub getSnapshotLogColumns {
        my ($dbh, $masterschema, $mastername) = @_;
        my $result = call_driver_function($dbh, 'snapshotlog_columns', undef, $masterschema, $mastername, undef);
        return $result;
}

#--
#-- SUB: getLogKeys
#-- Maps the MASTER keys to SNAPSHOT LOG keys
#--
sub getLogKeys {
        #-- Function parameters
        my ($masterKeys) = @_;

        #-- Local variables
        my @arrLogKeys = ();
        my $logKeys;
        my @keys = split(/,/, $masterKeys);

        for (my $i = 0; $i < @keys; ++$i) {
                my $col = @keys[$i];
                if ($col eq 'oid') {
                        push (@arrLogKeys, 'm_row$$');
                } else {
                        push (@arrLogKeys, $col);
                }
        }
        $logKeys = join(',', @arrLogKeys);
        return ($logKeys, \@arrLogKeys);
}

#--
#-- SUB: getSnapshotLogName
#-- Returns the snapshot log's table name
#--
sub getSnapshotLogName {
        my ($dbh, $masterschema, $mastername) = @_;
        return call_driver_function($dbh, 'snapshotlog_name', undef, $masterschema, $mastername, undef);
}

#--
#-- SUB: deleteOldSnapshotRecords
#-- Removes old records from the local snapshot based on the snapshot log
#--
sub deleteOldSnapshotRecords {
        #-- Function parameters
        my ($dbh_local, $dbh_remote, $snapshot, $masterschema, $mastername, $lastRefresh, @keys) = @_;

        #-- Local variables
        my $sql;
        my $filter;
        my $sth_local;
        my $sth_remote;
        my $qms;
        my $recs;
        my $sqlRow;
        my ($masterKeys, $logKeys, @arrLogKeys) = @keys;
        my $snapshotLogName = getSnapshotLogName($dbh_remote, $masterschema, $mastername);
        my $snapshotname;

        #-- Delete modified/removed records from SNAPSHOT
        $filter = getUpdatedDeletedLogRecordsFilter($dbh_remote, $lastRefresh);
        $sql = <<SQL;
SELECT $logKeys
FROM $masterschema.$snapshotLogName
WHERE $filter
SQL
        $sth_remote = $dbh_remote->prepare($sql);
        if ($dbh_remote->err) {
                elog ERROR, "Could not retrieve records to delete: $sql [".$dbh_remote->errstr."]";
        }
        $sth_remote->execute();
        if ($sth_remote->err) {
                elog ERROR, "Could not retrieve records to delete: $sql [".$sth_remote->errstr."]";
        }
       
        my @master = getSourceKeysFromQuery($snapshot->{'query'}, split(',', $masterKeys));
        $sqlRow = '';
       
        for (my $i=0; $i < @master; ++$i) {
                $sqlRow .= " AND " . @master->[$i] . '=?';
        }
        $sqlRow = substr($sqlRow, 5);

        $snapshotname = ($snapshot->{'pbt_table'} eq '') ? $snapshot->{'snapshotname'} : $snapshot->{'pbt_table'};
        $sql = <<SQL;
DELETE FROM $snapshot->{'schemaname'}.$snapshotname
WHERE $sqlRow
SQL
        if ($snapshot->{'pbt_table'} ne '') {
                $sql .= " AND pbt\$ = $snapshot->{'snapid'}";
        }
        $sth_local = $dbh_local->prepare($sql);
        if ($dbh_local->err) {
                elog ERROR, "Could not erase old records from snapshot: $sql [".$dbh_local->errstr."]";
        }
       
        $recs = 0;
        setTriggerStatus($dbh_local, $snapshot->{'schemaname'}, $snapshotname, "${snapshotname}_pbt\$_trg", FALSE);
        while(my @row = $sth_remote->fetchrow_array) {
                $sth_local->execute(@row);
                if ($sth_local->err) {
                        elog ERROR, $sth_local->errstr;
                }
                ++$recs;
                if (($recs % 1000) == 0) {
                        $dbh_local->commit;
                        elog NOTICE, "Deleted. Record #$recs" if $main::DEBUG==1;
                }
        }
        setTriggerStatus($dbh_local, $snapshot->{'schemaname'}, $snapshotname, "${snapshotname}_pbt\$_trg", TRUE);

        if (! $sth_remote->err) {
                elog NOTICE, "Deleted $recs modified records" if $main::DEBUG==1;
        } else {
                elog ERROR, "Could delete modified snapshot records: $sql [".$sth_remote->errstr."]";
        }

        $dbh_local->commit;
        $sth_local->finish;

        return $recs;
}

#--
#-- SUB: getSnapshotLogName
#-- Returns the snapshot log's table name
#--
sub getSnapshotLogName {
        my ($dbh, $masterschema, $mastername) = @_;
        return call_driver_function($dbh, 'snapshotlog_name', undef, $masterschema, $mastername, undef);
}

#--
#-- SUB: getUnifiedSqlType
#-- Maps a set of SQL types into a single SQL type
#--
sub getUnifiedSqlType {
        my ($dbiType) = @_;

        #--------------------------------
        #-- SQL datatype codes
        #-- SQL_GUID                         (-11)
        #-- SQL_WLONGVARCHAR                 (-10)
        #-- SQL_WVARCHAR                      (-9)
        #-- SQL_WCHAR                         (-8)
        #-- SQL_BIT                           (-7)
        #-- SQL_TINYINT                       (-6)
        #-- SQL_BIGINT                        (-5)
        #-- SQL_LONGVARBINARY                 (-4)
        #-- SQL_VARBINARY                     (-3)
        #-- SQL_BINARY                        (-2)
        #-- SQL_LONGVARCHAR                   (-1)
        #-- SQL_UNKNOWN_TYPE                    0
        #-- SQL_ALL_TYPES                       0
        #-- SQL_CHAR                            1
        #-- SQL_NUMERIC                         2
        #-- SQL_DECIMAL                         3
        #-- SQL_INTEGER                         4
        #-- SQL_SMALLINT                        5
        #-- SQL_FLOAT                           6
        #-- SQL_REAL                            7
        #-- SQL_DOUBLE                          8
        #-- SQL_DATETIME                        9
        #-- SQL_DATE                            9
        #-- SQL_INTERVAL                       10
        #-- SQL_TIME                           10
        #-- SQL_TIMESTAMP                      11
        #-- SQL_VARCHAR                        12
        #-- SQL_BOOLEAN                        16
        #-- SQL_UDT                            17
        #-- SQL_UDT_LOCATOR                    18
        #-- SQL_ROW                            19
        #-- SQL_REF                            20
        #-- SQL_BLOB                           30
        #-- SQL_BLOB_LOCATOR                   31
        #-- SQL_CLOB                           40
        #-- SQL_CLOB_LOCATOR                   41
        #-- SQL_ARRAY                          50
        #-- SQL_ARRAY_LOCATOR                  51
        #-- SQL_MULTISET                       55
        #-- SQL_MULTISET_LOCATOR               56
        #-- SQL_TYPE_DATE                      91
        #-- SQL_TYPE_TIME                      92
        #-- SQL_TYPE_TIMESTAMP                 93
        #-- SQL_TYPE_TIME_WITH_TIMEZONE        94
        #-- SQL_TYPE_TIMESTAMP_WITH_TIMEZONE   95
        #-- SQL_INTERVAL_YEAR                 101
        #-- SQL_INTERVAL_MONTH                102
        #-- SQL_INTERVAL_DAY                  103
        #-- SQL_INTERVAL_HOUR                 104
        #-- SQL_INTERVAL_MINUTE               105
        #-- SQL_INTERVAL_SECOND               106
        #-- SQL_INTERVAL_YEAR_TO_MONTH        107
        #-- SQL_INTERVAL_DAY_TO_HOUR          108
        #-- SQL_INTERVAL_DAY_TO_MINUTE        109
        #-- SQL_INTERVAL_DAY_TO_SECOND        110
        #-- SQL_INTERVAL_HOUR_TO_MINUTE       111
        #-- SQL_INTERVAL_HOUR_TO_SECOND       112
        #-- SQL_INTERVAL_MINUTE_TO_SECOND     113
        #--------------------------------

#--PG_BOOL PG_BYTEA PG_CHAR PG_INT8 PG_INT2 PG_INT4 PG_TEXT PG_OID
#--PG_FLOAT4 PG_FLOAT8 PG_ABSTIME PG_RELTIME PG_TINTERVAL PG_BPCHAR
#--PG_VARCHAR PG_DATE PG_TIME PG_DATETIME PG_TIMESPAN PG_TIMESTAMP
        if (($dbiType == DBI::SQL_INTEGER)) {
                return DBI::SQL_INTEGER;
        } elsif (($dbiType == DBI::SQL_SMALLINT)) {
                return DBI::SQL_SMALLINT;
        } elsif (($dbiType == DBI::SQL_TYPE_TIME_WITH_TIMEZONE)
                || ($dbiType == DBI::SQL_TYPE_TIMESTAMP_WITH_TIMEZONE)
                || ( $dbiType == DBI::SQL_DATETIME )
                || ( $dbiType == DBI::SQL_TIMESTAMP )
                || ( $dbiType == DBI::SQL_TYPE_TIMESTAMP ) ) { #--TIMESTAMP
                return DBI::SQL_TIMESTAMP;
        } elsif (($dbiType == DBI::SQL_NUMERIC )
                || ($dbiType == DBI::SQL_DECIMAL )
                || ($dbiType == DBI::SQL_FLOAT )
                || ($dbiType == DBI::SQL_REAL )
                || ($dbiType == DBI::SQL_DOUBLE )
#-- || ($dbiType == DBI::SQL_BIGINT )
                || ($dbiType == DBI::SQL_TINYINT )) { #--NUMERIC
                return DBI::SQL_NUMERIC;
        } elsif ( ( $dbiType == DBI::SQL_BOOLEAN )
                || ( $dbiType == DBI::SQL_BIT ) ) { #--BOOLEAN
                return DBI::SQL_BOOLEAN;
        } elsif ( ($dbiType == DBI::SQL_CHAR )
                || ( $dbiType == DBI::SQL_WCHAR ) ) { #--CHAR
                return DBI::SQL_CHAR;
        } elsif ( ($dbiType == DBI::SQL_VARCHAR )
                || ( $dbiType == DBI::SQL_WVARCHAR )) { #--VARCHAR
                return DBI::SQL_VARCHAR;
        } elsif ( ( $dbiType == DBI::SQL_LONGVARCHAR )
                || ( $dbiType == DBI::SQL_WLONGVARCHAR )
                || ( $dbiType == DBI::SQL_CLOB ) ) { #--CLOB
                return DBI::SQL_CLOB;
        } elsif ( ( $dbiType == DBI::SQL_DATE )
                || ( $dbiType == DBI::SQL_TYPE_DATE ) ) { #--DATE
                return DBI::SQL_DATE;
        } elsif ( ( $dbiType == DBI::SQL_TIME )
                || ( $dbiType == DBI::SQL_TYPE_TIME ) ) { #--TIME
                return DBI::SQL_TIME;
        } elsif ( ( $dbiType == DBI::SQL_INTERVAL )
                || ( $dbiType == DBI::SQL_INTERVAL_YEAR )
                || ( $dbiType == DBI::SQL_INTERVAL_MONTH )
                || ( $dbiType == DBI::SQL_INTERVAL_DAY )
                || ( $dbiType == DBI::SQL_INTERVAL_HOUR )
                || ( $dbiType == DBI::SQL_INTERVAL_MINUTE )
                || ( $dbiType == DBI::SQL_INTERVAL_SECOND )
                || ( $dbiType == DBI::SQL_INTERVAL_YEAR_TO_MONTH )
                || ( $dbiType == DBI::SQL_INTERVAL_DAY_TO_HOUR )
                || ( $dbiType == DBI::SQL_INTERVAL_DAY_TO_MINUTE )
                || ( $dbiType == DBI::SQL_INTERVAL_DAY_TO_SECOND )
                || ( $dbiType == DBI::SQL_INTERVAL_HOUR_TO_MINUTE )
                || ( $dbiType == DBI::SQL_INTERVAL_HOUR_TO_SECOND )
                || ( $dbiType == DBI::SQL_INTERVAL_MINUTE_TO_SECOND ) ) { #--INTERVAL
                return DBI::SQL_INTERVAL;
        } elsif ( ( $dbiType == DBI::SQL_BINARY )
                || ( $dbiType == DBI::SQL_VARBINARY )
                || ( $dbiType == DBI::SQL_LONGVARBINARY )
                || ( $dbiType == DBI::SQL_BLOB ) ) { #--BLOB
                return $dbiType == DBI::SQL_BLOB;
        } else {
                return DBI::SQL_VARCHAR; #-- default: VARCHAR
        }
}

#--
#-- SUB: getUpdatedDeletedLogRecordsFilter
#-- Returns the filter for retrieving snapshot log's updated or deleted records
#--
sub getUpdatedDeletedLogRecordsFilter {
        my ($dbh, $lastRefresh) = @_;
        return call_driver_function($dbh, 'snapshotlog_ud_filter', undef, undef, undef, $lastRefresh);
}
$BODY$
  LANGUAGE 'plperlu' VOLATILE;
ALTER FUNCTION refresh_snapshot(schemaname text, snapshotname text) OWNER TO postgres;
COMMENT ON FUNCTION refresh_snapshot(schemaname text, snapshotname text) IS $$
This function is part of PostgreSQL::Snapshots project.
This is the function that refreshes the snapshot. The refreshing process may take a long time depending on the size of the result, the connection speed, etc. This is not a memory consuming process once all the process is divided in transaction chunks of 1000 records. It uses more CPU (because of Perl) and network (because of the resultset size) than memory.
$$;
--
-- ATTENTION: This is a free software.
--            View the LICENSE.txt file for license information
--

------------------------------------------------------------------------------
-- FUNCTION: public.create_snapshot_log
--
-- Creates a SNAPSHOT LOG on a TABLE to be used on FAST refreshes
------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION public.create_snapshot_log(schemaname text, mastername text, withwhat text)
  RETURNS bool AS
$BODY$
use strict;
use DBI;
use constant TRUE => 1;
use constant FALSE => "";
#-- Function parameters
my ($schemaname, $mastername, $withwhat) = @_;
#-- Set this to 1 for debugging messages
$main::DEBUG=0;
#-- Localhost superuser connection
my $dbh_local = DBI->connect('dbi:Pg:dbname=labpeTeste', 'postgres', undef, {AutoCommit => 0});
#-- variables
my $sql;
my $rs;
my $row;
my $sth;
my $masterKeyColumns;
my $logKeyColumns;
my $masterPkColumns;
my $masterFilterColumns;
my $flag;
my $log;

#-- Test if the SNAPSHOT LOG already exists
if (snapshotLogExists($dbh_local, $schemaname, $mastername)) {
        elog ERROR, "Snapshot log on '$schemaname.$mastername' already exists! SQL=$sql";
}

if (! objectExists($dbh_local, $schemaname, $mastername)) {
        elog ERROR, "Master '$schemaname.$mastername' does not exist! SQL=$sql";
}

($flag, $masterKeyColumns, $logKeyColumns, $masterPkColumns, $masterFilterColumns) = getKeyColumns($dbh_local, $schemaname, $mastername, $withwhat);
elog NOTICE, 'KEY='.join(',', keys %$logKeyColumns) . ' TYPES='.join(',', values %$logKeyColumns) . ' FLAG='.$flag if $main::DEBUG == 1;

#-- Create the Snapshot's Log Table
$log = createSnapshotLogTable($schemaname, $mastername, $logKeyColumns);
createSnapshotLogTableIndexes($schemaname, $mastername, $log, $logKeyColumns);
createSnapshotLogTrigger($dbh_local, $schemaname, $mastername, $log, $masterKeyColumns, $logKeyColumns);

createSnapshotLogEntry($dbh_local, $schemaname, $mastername, $flag, $log, $masterPkColumns, $masterFilterColumns);

#--call_driver_function($dbh_local, 'snapshot_do', 'REGISTER', $schemaname, $mastername, $snapid);

$dbh_local->commit;
$dbh_local->disconnect;
#-- All done. Let's return TRUE
return TRUE;

#--
#-- SUB: sqlLookup
#-- Returns a one row result from a SQL query
#--
sub sqlLookup {
        my ($dbh, $sql, @params) = @_;
        return sqlLookup2 ($dbh, $sql, \@params, TRUE);
}
sub sqlLookupSoft {
        my ($dbh, $sql, @params) = @_;
        return sqlLookup2 ($dbh, $sql, \@params, FALSE);
}

sub sqlLookup2 {
        my ($dbh, $sql, $params, $errorsAreFatal) = @_;
        my $sth;
        my @row;

        elog NOTICE, "Lookup at SQL:$sql with ".((@$params ne 0) ? join(',', @$params) : '<none>') if $main::DEBUG==1;

        $sth = $dbh->prepare($sql);
        if (@$params ne 0) {
                $sth->execute(@$params);
        } else {
                $sth->execute();
        }
        if ($sth->err) {
                if ($errorsAreFatal) {
                        elog ERROR, "Fatal error! SQL=$sql ERROR=".$sth->errstr;
                }
        }
        my @row = $sth->fetchrow_array;
        $sth->finish;
        return @row;
}
#--
#-- SUB: snapshotLogExists
#-- Returns whether a master object has a SNAPSHOT LOG
#--
sub snapshotLogExists {
        #-- Function parameters
        my ($dbh, $masterschema, $mastername) = @_;

        my $result = call_driver_function($dbh, 'snapshotlog_exists', undef, $masterschema, $mastername, undef);
        return ($result eq 'T');
}
#--
#-- SUB: objectExists
#-- Returns whether an object exists or not
#--
sub objectExists {
        my ($dbh, $schemaname, $snapshotname) = @_;

        # local variables
        my $sql;
        my $sth;
        my $row;

        $sql = <<SQL;
SELECT count(*) as total
FROM pg_catalog.pg_class c
        LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname=? AND c.relname = ?
SQL
        $sth = $dbh->prepare($sql);
        $sth->execute(($schemaname, $snapshotname));
        $row = $sth->fetchrow_hashref;
        return ($row->{total} ne 0);
}
#--
#-- SUB: getKeyColumns
#-- Returns the keys(on master and on snapshot log) involved on a "create snapshot WITH" statement
#--
sub getKeyColumns {
        #-- Function parameters
        my ($dbh, $schemaname, $mastername, $withwhat) = @_;

        #-- Local variables
        my $sth;
        my %masterKeyColumns = ();
        my %logKeyColumns = ();
        my %masterFilterColumns = ();
        my %masterPkColumns = ();
        my %tbFields = ();
        my @pk;
        my $fields;
        my $flag = 0x20; #-- ALL=0x20 HASPK=0x40 ROWID=0x01 PK=0x02 FILTER=0x04

        #-- Get object's metadata
        $sth = getObjectMeta($dbh, $schemaname, $mastername);

        #-- Creates a hashmap for FIELD NAME to FIELD TYPE mapping
        for (my $i = 0; $i < $sth->{NUM_OF_FIELDS}; ++$i) {
                my $fieldname = lc($sth->{NAME}->[$i]);
                my $type = $dbh->type_info( [ $sth->{TYPE}->[$i] ] )->{TYPE_NAME};
                %tbFields->{$fieldname} = $type;
        }
       
        #-- Get the table's primary key fields
        @pk = $dbh->primary_key( undef, $schemaname, $mastername );
        $dbh->commit;
       
        #elog NOTICE, 'MASTER=(FIELDS:'.join(',', keys %tbFields).' TYPES:'.join(',', values %tbFields).' PK:'.join(',', @pk).')'  if $main::DEBUG==1;
       
        $withwhat = lc($withwhat);
        $fields = '';
        if ( $withwhat =~ m/.*\(.*\).*/ ) {
                #-- We have parenthesis = additional fields
                $fields = $withwhat;
                #-- Save the additional fields
                $fields =~ s/^.*\((.*)\).*$/$1/;
                #-- Remove the additional fields from WITH
                $withwhat =~ s/^(.*)\(.*\)(.*)$/$1$2/;
        }
       
        #-- Parse the remaining arguments of WITH
        my @aux = split(/,/, $withwhat);
        for (my $i=0; $i < @aux; ++$i) {
                if (@aux[$i] eq 'oid') {
                        $flag |= 0x01;
                        %masterKeyColumns->{'oid'}=1;
                        %logKeyColumns->{'m_row$$'}='oid';
                } elsif (@aux[$i] eq 'primary key') {
                        $flag |= 0x02;
                        $flag |= 0x40;
                        if (@pk eq 0) {
                                elog ERROR, 'Source does not have a PRIMARY KEY';
                        }
                        for (my $j=0; $j < @pk; ++$j) {
                                %masterPkColumns->{@pk[$j]}=1;
                                %masterKeyColumns->{@pk[$j]}=1;
                                %logKeyColumns->{@pk[$j]}=%tbFields->{@pk[$j]};
                        }
                } else {
                        elog ERROR, 'Syntax error on WITH clause: '.@aux[$i];
                }
        }

        #-- Parse the additional fields of WITH
        @aux = split(/,/, $fields);
        for (my $i=0; $i < @aux; ++$i) {
                my $column = @aux[$i];
                if (%tbFields->{$column} eq undef) {
                        elog ERROR, "Column not found: [$column]";
                }
                %masterFilterColumns->{$column}=1;
                %masterKeyColumns->{$column}=1;
                %logKeyColumns->{$column} = %tbFields->{$column};
                $flag |= 0x04;
        }
        #--elog NOTICE, 'KEY='.join(',', keys %logKeyColumns) . ' TYPES='.join(',', values %logKeyColumns) if $main::DEBUG == 1;
        return ($flag, \%masterKeyColumns, \%logKeyColumns, \%masterPkColumns, \%masterFilterColumns);
}
#--
#-- SUB: createSnapshotLogTable
#-- Creates the SNAPSHOT LOG table
#--
sub createSnapshotLogTable {
        my ($schemaname, $mastername, $logKeyColumns) = @_;

        my $strcolumns = '';
        my $sql;
        my $sth;

        while (my ($key,$value) = each %$logKeyColumns) {
                elog NOTICE, "$key=>$value" if $main::DEBUG==1;
                my $entry = "$key $value";
                $strcolumns = "$strcolumns$entry,\n";
        }

        $sql = <<SQL;
CREATE TABLE $schemaname.mlog\$_$mastername (
        $strcolumns
        snaptime\$\$ timestamp,
        dmltype\$\$ varchar(1),
        old_new\$\$ varchar(1),
        change_vector\$\$ varchar(255)
)
SQL

        elog NOTICE, $sql if $main::DEBUG==1;

        $sth = spi_exec_query($sql);

        if ($sth->{status} ne 'SPI_OK_UTILITY') {
                elog ERROR, "Could not create the snapshot log for table '$schemaname.$mastername'. STATUS='".$sth->status."' SQL='$sql'";
        }

        return "mlog\$_$mastername";
}
#--
#-- SUB: createSnapshotLogTableIndexes
#-- Creates the INDEXES on the SNAPSHOT LOG table, for performance reasons only
#--
sub createSnapshotLogTableIndexes {
        my ($schemaname, $mastername, $log, $logKeyColumns) = @_;

        my $sth;
        my $sql;

        #-- Create the Snapshot's Log Table Indexes
        my $keys = join(',', keys %$logKeyColumns);
        $sql = <<SQL;
CREATE INDEX ${log}_ix1 on $schemaname.$log ($keys);
SQL
        $sth = spi_exec_query($sql);
        if ($sth->{status} ne 'SPI_OK_UTILITY') {
                elog ERROR, "Could not create the snapshot log index for table '$schemaname.$mastername'. STATUS='".$sth->status."' SQL='$sql'";
        }
       
        $sql = <<SQL;
CREATE INDEX ${log}_ix2 on $schemaname.$log (snaptime\$\$, dmltype\$\$);
SQL
        $sth = spi_exec_query($sql);
        if ($sth->{status} ne 'SPI_OK_UTILITY') {
                elog ERROR, "Could not create the snapshot log index for table '$schemaname.$mastername'. STATUS='".$sth->status."' SQL='$sql'";
        }
}
#--
#-- SUB: createSnapshotLogTrigger
#-- Creates the triggers on the MASTER object in order to fill the SNAPSHOT LOG entries
#--
sub createSnapshotLogTrigger {
        #-- Function parameters
        my ($dbh, $schemaname, $mastername, $log, $masterKeyColumns, $logKeyColumns) = @_;

        #-- Local variables
        my $sth;
        my $sql;

        #-- Get object's metadata
        $sth = getObjectMeta($dbh, $schemaname, $mastername);

        #-- Create the PL/PgSQL code for computing the CHANGEVECTOR field
        my $all_unset_cv = '0' x ($sth->{NUM_OF_FIELDS} / 8);
        my $all_set_cv = 'F' x ($sth->{NUM_OF_FIELDS} / 8);
       
        if (($sth->{NUM_OF_FIELDS} % 8) ne 0) {
                $all_unset_cv = $all_unset_cv . '0';
                $all_set_cv = $all_set_cv.sprintf("%x",(2^($sth->{NUM_OF_FIELDS} % 8)-1));
        }
       
        my $compute_change_vector = '';
       
        for (my $i = 0; $i < $sth->{NUM_OF_FIELDS}; ++$i) {
                my $fieldname = $sth->{NAME}->[$i];
                my $value = 2**$i;
                $compute_change_vector = $compute_change_vector . <<SQL;
                        IF (OLD.$fieldname <> NEW.$fieldname) THEN
                                CVB = CVB + $value;
                        END IF;
SQL
                if ((($i + 1) % 8) eq 0) {
                        $compute_change_vector = $compute_change_vector . <<SQL;
                        CV = CV || to_hex(CVB);
                        ALL_CVB = ALL_CVB + CVB;
                        CVB = 0;
SQL
                }
        }
        if (($sth->{NUM_OF_FIELDS} % 8) ne 0) {
                $compute_change_vector = $compute_change_vector . <<SQL;
                        CV = CV || to_hex(CVB);
                        ALL_CVB = ALL_CVB + CVB;
SQL
        }

        $sth->finish;
        $dbh->commit;

        #-- Create the TRIGGER function
        elog NOTICE, "Creating trigger function $schemaname.${log}_trgfn()" if $main::DEBUG==1;
       
        my $oldMasterKeys = 'OLD.'.join(', OLD.', keys %$masterKeyColumns);
        my $newMasterKeys = 'NEW.'.join(', NEW.', keys %$masterKeyColumns);
       
        $sql = <<SQL;
CREATE OR REPLACE FUNCTION $schemaname.${log}_trgfn()
RETURNS trigger AS
\$BODYFN\$
DECLARE
        CV varchar(255);
        CVB integer;
        ALL_CVB numeric;
BEGIN
        IF (TG_OP = 'DELETE') THEN
                INSERT INTO $schemaname.$log SELECT $oldMasterKeys,NULL,'D','O','$all_unset_cv';
        ELSIF (TG_OP = 'UPDATE') THEN
                CV = '';
                CVB = 0;
                ALL_CVB = 0;
$compute_change_vector
                IF (ALL_CVB > 0) THEN
                        INSERT INTO $schemaname.$log SELECT $oldMasterKeys,NULL,'U','O',CV;
                END IF;
                IF (OLD.oid <> NEW.oid) THEN
                        INSERT INTO $schemaname.$log SELECT $newMasterKeys,NULL,'U','N',CV;
                END IF;
        ELSIF (TG_OP = 'INSERT') THEN
                INSERT INTO $schemaname.$log SELECT $newMasterKeys,NULL,'I','N','$all_set_cv';
        END IF;
        RETURN NULL;
END;
\$BODYFN\$
LANGUAGE 'plpgsql' VOLATILE;
SQL
       
        elog NOTICE, $sql if $main::DEBUG==1;
       
        $dbh->do($sql);
        if ($dbh->err) {
                elog ERROR, "Could not create the snapshot log trigger function for table '$schemaname.$mastername'. ERR='".$dbh->errstr."' SQL='$sql'";
        }
       
        #-- Create TRIGGER on master table
        elog NOTICE, "Creating trigger ${log}_trg on $schemaname.${mastername}" if $main::DEBUG==1;
        $sql = <<SQL;
CREATE TRIGGER ${log}_trg AFTER INSERT OR UPDATE OR DELETE
ON $schemaname.${mastername} FOR EACH ROW
EXECUTE PROCEDURE $schemaname.${log}_trgfn ()
SQL
       
        $dbh->do($sql);
        if ($dbh->err) {
                elog ERROR, "Could not create the snapshot log trigger on table '$schemaname.$mastername'. ERR='".$dbh->errstr."' SQL='$sql'";
        }
}
sub createSnapshotLogEntry {
        my ($dbh, $schemaname, $mastername, $flag, $log, $masterPkColumns, $masterFilterColumns) = @_;

        my $sth;

        #-- Add the Snapshot Log to the public.pg_mlogs table
        $sql = <<SQL;
        INSERT INTO public.pg_mlogs(masterschema, mastername, flag, log)
        VALUES (?,?,?,?)
SQL
        $sth = $dbh->prepare($sql);
        if ($sth->err) {
                elog ERROR, "Could not prepare the snapshot log entry on system table 'public.pg_mlogs'. ERROR='".$sth->errstr."' SQL='$sql'";
        }
        $sth->execute(($schemaname, $mastername, $flag, $log));
        if ($sth->err) {
                elog ERROR, "Could not create the snapshot log entry on system table 'public.pg_mlogs'. ERROR='".$sth->errstr."' SQL='$sql'";
        }

        my $snaplogid;
        my @row;

        $sql = <<SQL;
        SELECT snaplogid
        FROM public.pg_mlogs
        WHERE masterschema=?
                AND mastername=?
SQL
        $sth = $dbh->prepare($sql);
        $sth->execute(($schemaname, $mastername));
        if ($sth->err) {
                elog ERROR, $sth->errstr."' SQL='$sql'";
        }
        @row = $sth->fetchrow_array;
        $snaplogid = @row[0];

        elog NOTICE, 'SNAPLOGID='.$snaplogid if $main::DEBUG == 1;

        createSnapshotLogEntryColumns($dbh, $schemaname, $mastername, $flag, $log, $snaplogid, $masterPkColumns, $masterFilterColumns);
}
#--
#-- SUB: call_driver_function
#-- Calls a driver function (native) on the database pointed by the database handle ($dbh)
#--
sub call_driver_function {
        my ($dbh, $function_name, $operation, $masterschema, $mastername, $additional) = @_;
        $masterschema = uc($masterschema);
        $mastername = uc($mastername);

        elog NOTICE, "Calling DRIVER function $function_name ($operation, $masterschema, $mastername, $additional)" if $main::DEBUG==1;
        if ($function_name eq 'snapshot_do') {
                dbi_execute_stored_procedure($dbh, 'snapshot_do', ('123456', $operation, $masterschema, $mastername, $additional));
        } elsif ($function_name eq 'snapshotlog_exists') {
                return dbi_call_function($dbh, 'snapshotlog_exists', ($masterschema, $mastername));
        } elsif ($function_name eq 'last_log_refresh') {
                return dbi_call_function($dbh, 'last_log_refresh' , ($masterschema, $mastername, $additional));
        } elsif ($function_name eq 'snapshotlog_columns') {
                return dbi_call_function($dbh, 'snapshotlog_columns', ($masterschema, $mastername));
        } elsif ($function_name eq 'snapshotlog_name') {
                return dbi_call_function($dbh, 'snapshotlog_name', ($masterschema, $mastername));
        } elsif ($function_name eq 'count_log_modified_rows') {
                return dbi_call_function($dbh, 'count_log_modified_rows', ($masterschema, $mastername, $additional));
        } elsif ($function_name eq 'snapshotlog_ud_filter') {
                return dbi_call_function($dbh, 'snapshotlog_ud_filter', ($additional));
        } elsif ($function_name eq 'snapshotlog_iu_filter') {
                return dbi_call_function($dbh, 'snapshotlog_iu_filter', ($additional));
        } else {
                elog ERROR, "Could not call DRIVER function: $function_name";
        }
        $dbh->commit;
}

sub dbi_execute_stored_procedure {
        use DBI::Const::GetInfoType;
        my ($dbh, $procname, @parameters) = @_;
        my $dbname = lc($dbh->get_info( $GetInfoType{SQL_DBMS_NAME} ));
        my $sql;
        my $sth;
        my $qmarks = '?,' x @parameters;
        chop $qmarks;

        if ($dbname eq 'oracle') {
                $sql = "BEGIN $procname($qmarks); END;";
        } elsif ($dbname eq 'postgresql') {
                $sql = "SELECT $procname($qmarks)";
        } else {
                $sql = "SELECT $procname($qmarks)";
        }
        $sth = $dbh->prepare($sql);
        if ($dbh->err) {
                elog ERROR, $dbh->errstr."' SQL='$sql'";
        }
        $sth->execute(@parameters);
        if ($sth->err) {
                elog ERROR, $sth->errstr."' SQL='$sql'";
        }
}

sub dbi_call_function {
        use DBI::Const::GetInfoType;
        my ($dbh, $funcname, @parameters) = @_;
        my $dbname = lc($dbh->get_info( $GetInfoType{SQL_DBMS_NAME} ));
        my $sql;
        my $sth;
        my @row;
        my $qmarks = '?,' x @parameters;
        chop $qmarks;

        if ($dbname eq 'oracle') {
                $sql = "SELECT $funcname($qmarks) FROM DUAL";
        } elsif ($dbname eq 'postgresql') {
                $sql = "SELECT $funcname($qmarks)";
        } else {
                $sql = "SELECT $funcname($qmarks)";
        }
        $sth = $dbh->prepare($sql);
        if ($dbh->err) {
                elog ERROR, $dbh->errstr."' SQL='$sql'";
        }
        $sth->execute(@parameters);
        if ($sth->err) {
                elog ERROR, $sth->errstr."' SQL='$sql'";
        }
        @row = $sth->fetchrow_array;
        my $result = @row[0];
        return $result;
}

#-- Dependencies

#--
#-- SUB: getObjectMeta
#-- Returns an empty result STATEMENT in order to retrieve the object's METADATA
#--
sub getObjectMeta {
        my ($dbh, $schemaname, $mastername) = @_;

        my $sql;
        my $sth;

        $sql = <<SQL;
SELECT * from $schemaname.$mastername
WHERE 1=0
SQL
        $sth = $dbh->prepare($sql);
        if (! $sth->err) {
                $sth->execute();
        }
        return $sth;
}sub createSnapshotLogEntryColumns {
        my ($dbh, $schemaname, $mastername, $flag, $log, $snaplogid, $masterPkColumns, $masterFilterColumns) = @_;

        my $sql;
        my $sth;

        #-- Add the Snapshot Log Columna to the public.pg_mlog_refcols table
        $sql = <<SQL;
        INSERT INTO public.pg_mlog_refcols(snaplogid, masterschema, mastername, colname, flag)
        VALUES (?,?,?,?,?)
SQL
        $sth = $dbh->prepare($sql);
        if ($sth->err) {
                elog ERROR, "Could not prepare the snapshot log column entry on system table 'public.pg_mlog_refcols'. ERROR='".$sth->errstr."' SQL='$sql'";
        }
       
        if ($flag & 0x42) {
                #-- PK
                while (my ($key,$value) = each %$masterPkColumns) {
                        $sth->execute($snaplogid, $schemaname, $mastername, $key, 2);
                        if ($sth->err) {
                                elog ERROR, "Could not create the snapshot log column entry on system table 'public.pg_mlog_refcols'. ERROR='".$sth->errstr."' SQL='$sql'";
                        }
                }
        }
        if ($flag & 0x04) {
                #-- FILTER
                while (my ($key,$value) = each %$masterFilterColumns) {
                        $sth->execute($snaplogid, $schemaname, $mastername, $key, 2);
                        if ($sth->err) {
                                elog ERROR, "Could not create the snapshot log column entry on system table 'public.pg_mlog_refcols'. ERROR='".$sth->errstr."' SQL='$sql'";
                        }
                }
        }
}

$BODY$
  LANGUAGE 'plperlu' VOLATILE;
ALTER FUNCTION public.create_snapshot_log(schemaname text, mastername text, withwhat text) OWNER TO postgres;
COMMENT ON FUNCTION public.create_snapshot_log(schemaname text, mastername text, withwhat text) IS $$
This function is part of PostgreSQL::Snapshots project.
This is the function that creates a snapshot log.
$$;

------------------------------------------------------------------------------
-- FUNCTION: public.drop_snapshot_log
--
-- Removes a previously created SNAPSHOT LOG
------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION public.drop_snapshot_log(schemaname text, mastername text)
  RETURNS bool AS
$BODY$
use strict;
use DBI;
use constant TRUE => 1;
use constant FALSE => "";
#-- Function parameters
my ($schemaname, $mastername) = @_;
#-- Set this to 1 for debugging messages
$main::DEBUG=0;
#-- Localhost superuser connection
my $dbh_local = DBI->connect('dbi:Pg:dbname=labpeTeste', 'postgres', undef, {AutoCommit => 0});
my $sql;
my $rs;
my $row;
my $log;

if (! snapshotLogExists($dbh_local, $schemaname, $mastername)) {
        elog ERROR, "Snapshot log on '$schemaname.$mastername' does not exist!";
}
elog NOTICE, "Snapshot log on '$schemaname.$mastername' exists..." if $main::DEBUG==1;

$log = getSnapshotLogName($dbh_local, $schemaname, $mastername);

elog NOTICE, "Snapshot log on '$schemaname.$mastername' is $log" if $main::DEBUG==1;

$dbh_local->commit;

#-- Drop the Snapshot's Log Table
$sql = <<SQL;
DROP TABLE $schemaname.$log
SQL

$rs = spi_exec_query($sql);
if ($rs->{status} ne 'SPI_OK_UTILITY') {
        elog ERROR, "Could not drop Snapshot log '$schemaname.$log'. STATUS='".$rs->status."' SQL='$sql'";
}

elog NOTICE, "Snapshot log '$log' dropped." if $main::DEBUG==1;

#-- Delete the Snapshot Log entry
$sql = <<SQL;
DELETE FROM public.pg_mlogs
WHERE masterschema=?
        AND mastername=?
SQL

$rs = $dbh_local->prepare($sql);
$rs->execute($schemaname, $mastername);
if ($rs->err) {
        elog ERROR, "Could not delete snapshot log entry on system table 'public.pg_mlogs'. ERR='".$rs->errstr."' SQL='$sql'";
}

elog NOTICE, "Snapshot log entry removed." if $main::DEBUG==1;

#-- Drop triggers on master table
$sql = <<SQL;
DROP TRIGGER ${log}_trg ON $schemaname.${mastername}
SQL

$dbh_local->do($sql);
if ($dbh_local->err) {
        elog ERROR, "Could not drop the snapshot log trigger on table '$schemaname.$mastername'. ERR='".$dbh_local->errstr."' SQL='$sql'";
}

elog NOTICE, "Snapshot log table trigger dropped." if $main::DEBUG==1;

#-- Drop trigger's function
$sql = <<SQL;
DROP FUNCTION $schemaname.${log}_trgfn();
SQL

$dbh_local->do($sql);
if ($dbh_local->err) {
        elog ERROR, "Could not drop the snapshot log trigger function '$schemaname.mlog\$_${mastername}_trgfn()'. ERR='".$dbh_local->errstr."' SQL='$sql'";
}

elog NOTICE, "Snapshot log trigger function dropped." if $main::DEBUG==1;

$dbh_local->commit;
$dbh_local->disconnect;
#-- All done. Let's return TRUE
return TRUE;

#--
#-- SUB: sqlLookup
#-- Returns a one row result from a SQL query
#--
sub sqlLookup {
        my ($dbh, $sql, @params) = @_;
        return sqlLookup2 ($dbh, $sql, \@params, TRUE);
}
sub sqlLookupSoft {
        my ($dbh, $sql, @params) = @_;
        return sqlLookup2 ($dbh, $sql, \@params, FALSE);
}

sub sqlLookup2 {
        my ($dbh, $sql, $params, $errorsAreFatal) = @_;
        my $sth;
        my @row;

        elog NOTICE, "Lookup at SQL:$sql with ".((@$params ne 0) ? join(',', @$params) : '<none>') if $main::DEBUG==1;

        $sth = $dbh->prepare($sql);
        if (@$params ne 0) {
                $sth->execute(@$params);
        } else {
                $sth->execute();
        }
        if ($sth->err) {
                if ($errorsAreFatal) {
                        elog ERROR, "Fatal error! SQL=$sql ERROR=".$sth->errstr;
                }
        }
        my @row = $sth->fetchrow_array;
        $sth->finish;
        return @row;
}
#--
#-- SUB: snapshotLogExists
#-- Returns whether a master object has a SNAPSHOT LOG
#--
sub snapshotLogExists {
        #-- Function parameters
        my ($dbh, $masterschema, $mastername) = @_;

        my $result = call_driver_function($dbh, 'snapshotlog_exists', undef, $masterschema, $mastername, undef);
        return ($result eq 'T');
}
#--
#-- SUB: getSnapshotLogName
#-- Returns the snapshot log's table name
#--
sub getSnapshotLogName {
        my ($dbh, $masterschema, $mastername) = @_;
        return call_driver_function($dbh, 'snapshotlog_name', undef, $masterschema, $mastername, undef);
}


#-- Dependencies

#--
#-- SUB: call_driver_function
#-- Calls a driver function (native) on the database pointed by the database handle ($dbh)
#--
sub call_driver_function {
        my ($dbh, $function_name, $operation, $masterschema, $mastername, $additional) = @_;
        $masterschema = uc($masterschema);
        $mastername = uc($mastername);

        elog NOTICE, "Calling DRIVER function $function_name ($operation, $masterschema, $mastername, $additional)" if $main::DEBUG==1;
        if ($function_name eq 'snapshot_do') {
                dbi_execute_stored_procedure($dbh, 'snapshot_do', ('123456', $operation, $masterschema, $mastername, $additional));
        } elsif ($function_name eq 'snapshotlog_exists') {
                return dbi_call_function($dbh, 'snapshotlog_exists', ($masterschema, $mastername));
        } elsif ($function_name eq 'last_log_refresh') {
                return dbi_call_function($dbh, 'last_log_refresh' , ($masterschema, $mastername, $additional));
        } elsif ($function_name eq 'snapshotlog_columns') {
                return dbi_call_function($dbh, 'snapshotlog_columns', ($masterschema, $mastername));
        } elsif ($function_name eq 'snapshotlog_name') {
                return dbi_call_function($dbh, 'snapshotlog_name', ($masterschema, $mastername));
        } elsif ($function_name eq 'count_log_modified_rows') {
                return dbi_call_function($dbh, 'count_log_modified_rows', ($masterschema, $mastername, $additional));
        } elsif ($function_name eq 'snapshotlog_ud_filter') {
                return dbi_call_function($dbh, 'snapshotlog_ud_filter', ($additional));
        } elsif ($function_name eq 'snapshotlog_iu_filter') {
                return dbi_call_function($dbh, 'snapshotlog_iu_filter', ($additional));
        } else {
                elog ERROR, "Could not call DRIVER function: $function_name";
        }
        $dbh->commit;
}

sub dbi_execute_stored_procedure {
        use DBI::Const::GetInfoType;
        my ($dbh, $procname, @parameters) = @_;
        my $dbname = lc($dbh->get_info( $GetInfoType{SQL_DBMS_NAME} ));
        my $sql;
        my $sth;
        my $qmarks = '?,' x @parameters;
        chop $qmarks;

        if ($dbname eq 'oracle') {
                $sql = "BEGIN $procname($qmarks); END;";
        } elsif ($dbname eq 'postgresql') {
                $sql = "SELECT $procname($qmarks)";
        } else {
                $sql = "SELECT $procname($qmarks)";
        }
        $sth = $dbh->prepare($sql);
        if ($dbh->err) {
                elog ERROR, $dbh->errstr."' SQL='$sql'";
        }
        $sth->execute(@parameters);
        if ($sth->err) {
                elog ERROR, $sth->errstr."' SQL='$sql'";
        }
}

sub dbi_call_function {
        use DBI::Const::GetInfoType;
        my ($dbh, $funcname, @parameters) = @_;
        my $dbname = lc($dbh->get_info( $GetInfoType{SQL_DBMS_NAME} ));
        my $sql;
        my $sth;
        my @row;
        my $qmarks = '?,' x @parameters;
        chop $qmarks;

        if ($dbname eq 'oracle') {
                $sql = "SELECT $funcname($qmarks) FROM DUAL";
        } elsif ($dbname eq 'postgresql') {
                $sql = "SELECT $funcname($qmarks)";
        } else {
                $sql = "SELECT $funcname($qmarks)";
        }
        $sth = $dbh->prepare($sql);
        if ($dbh->err) {
                elog ERROR, $dbh->errstr."' SQL='$sql'";
        }
        $sth->execute(@parameters);
        if ($sth->err) {
                elog ERROR, $sth->errstr."' SQL='$sql'";
        }
        @row = $sth->fetchrow_array;
        my $result = @row[0];
        return $result;
}

$BODY$
  LANGUAGE 'plperlu' VOLATILE;
ALTER FUNCTION public.drop_snapshot_log(schemaname text, mastername text) OWNER TO postgres;
COMMENT ON FUNCTION public.drop_snapshot_log(schemaname text, mastername text) IS $$
This function is part of PostgreSQL::Snapshots project.
This is the function that drops a snapshot log.
$$;

_______________________________________________
Grupo de Usuários do PostgreSQL no Brasil
Antes de perguntar consulte o manual
http://pgdocptbr.sourceforge.net/

Para editar suas opções ou sair da lista acesse a página da lista em:
http://pgfoundry.org/mailman/listinfo/brasil-usuarios