diff options
author | Arne Georg Gleditsch <argggh@lxr.linpro.no> | 2007-11-27 01:13:37 +0100 |
---|---|---|
committer | Arne Georg Gleditsch <argggh@lxr.linpro.no> | 2007-11-27 01:13:37 +0100 |
commit | 828f8807dcc27d236bc9f36b57987fd5cd505f95 (patch) | |
tree | 931af80f1c098843dd7f9e824b3542348b9030e2 | |
parent | 53aeb447f17030c94cd7c2048929c432a8761aff (diff) |
Improve flushing logic,
-rw-r--r-- | lib/LXRng/Index/Pg.pm | 26 | ||||
-rw-r--r-- | lib/LXRng/Index/PgBatch.pm | 55 |
2 files changed, 60 insertions, 21 deletions
diff --git a/lib/LXRng/Index/Pg.pm b/lib/LXRng/Index/Pg.pm index a385767..7afb2a8 100644 --- a/lib/LXRng/Index/Pg.pm +++ b/lib/LXRng/Index/Pg.pm @@ -8,12 +8,16 @@ use base qw(LXRng::Index::DBI); sub dbh { my ($self) = @_; - $$self{'dbh'} ||= DBI->connect('dbi:Pg:'.$$self{'db_spec'}, - $$self{'db_user'}, $$self{'db_pass'}, - {AutoCommit => 1, - RaiseError => 1}) + return $$self{'dbh'} if $$self{'dbh'}; + + $$self{'dbh'} = DBI->connect('dbi:Pg:'.$$self{'db_spec'}, + $$self{'db_user'}, $$self{'db_pass'}, + {AutoCommit => 1, + RaiseError => 1, + pg_server_prepare => 1}) or die($DBI::errstr); - + $$self{'dbh_pid'} = $$; + return $$self{'dbh'}; } @@ -425,9 +429,15 @@ sub DESTROY { my ($self) = @_; if ($$self{'dbh'}) { - $$self{'dbh'}->rollback(); - $$self{'dbh'}->disconnect(); - delete($$self{'dbh'}); + if ($$self{'dbh_pid'} != $$) { + $$self{'dbh'}->{InactiveDestroy} = 1; + undef $$self{'dbh'}; + } + else { + $$self{'dbh'}->rollback(); + $$self{'dbh'}->disconnect(); + delete($$self{'dbh'}); + } } } diff --git a/lib/LXRng/Index/PgBatch.pm b/lib/LXRng/Index/PgBatch.pm index 40654d2..af2c212 100644 --- a/lib/LXRng/Index/PgBatch.pm +++ b/lib/LXRng/Index/PgBatch.pm @@ -32,7 +32,7 @@ sub transaction { # Only occasional synchronization if we're inside another # transaction. - if ($self->{'writes'}++ % 997 == 0) { + if ($self->{'writes'}++ % 491 == 0) { $self->flush(); $self->dbh->commit(); } @@ -44,6 +44,7 @@ sub new { my $self = $class->SUPER::new(@args); $$self{'writes'} = 0; + $$self{'rows'} = 0; return $self; } @@ -57,19 +58,24 @@ sub flush { my $pre = $self->prefix; $self->dbh->commit() unless $self->dbh->{AutoCommit}; - my $pid = fork(); + my $pid = open($$self{'flush_pipe'}, "-|"); die("fork failed: $!") unless defined($pid); if ($pid == 0) { $SIG{'INT'} = 'IGNORE'; $SIG{'QUIT'} = 'IGNORE'; $SIG{'TERM'} = 'IGNORE'; + $SIG{'PIPE'} = 'IGNORE'; + undef $$self{'flush_pipe'}; my $i = 0; - $$self{'dbh'} = undef; - foreach my $table (qw(symbols identifiers usage)) { - if (exists($$self{'cache'}{$table})) { + my $cache = $$self{'cache'}; + $$self{'dbh'}->{InactiveDestroy} = 1 if $$self{'dbh'}; + undef $$self{'cache'}; + undef $$self{'dbh'}; + foreach my $table (qw(symbols identifiers usage includes)) { + if (exists($$cache{$table})) { $self->dbh->do(qq{copy $pre$table from stdin}); - foreach my $l (@{$$self{'cache'}{$table}}) { + foreach my $l (@{$$cache{$table}}) { $i++; $self->dbh->pg_putline($l); } @@ -77,25 +83,41 @@ sub flush { } } $self->dbh->commit() unless $self->dbh->{AutoCommit}; - $self->dbh->do(q(analyze)) if $i > 500000; + # Analyze after first 50k rows, then for every 3M row. + $self->dbh->do(q(analyze)) if + (($$self{'rows'} % 3000000) + $i > 3000000) or + (($$self{'rows'} < 50000) and ($$self{'rows'} + $i > 50000)); + $self->dbh->disconnect(); - warn "\n*** index: flushed $i rows\n"; + print("$i\n"); + close(STDOUT); kill(9, $$); } $$self{'flush_pid'} = $pid; + foreach my $table (%{$$self{'cache'}}) { + @{$$self{'cache'}{$table}} = (); + } + %{$$self{'cache'}} = (); delete($$self{'cache'}); - warn "\n*** index: flushing in background\n"; + + warn "*** index: flushing in background\n"; } sub _flush_wait { my ($self) = @_; - return unless $$self{'flush_pid'}; - waitpid($$self{'flush_pid'}, WNOHANG); # Reap zombies - return unless kill(0, $$self{'flush_pid'}); + return unless $$self{'flush_pipe'}; - warn "\n*** index: waiting for running flush to complete...\n"; + warn "*** index: waiting for running flush to complete...\n"; $self->dbh->commit() unless $self->dbh->{AutoCommit}; + my $rows; + if (sysread($$self{'flush_pipe'}, $rows, 1024) > 0) { + $rows += 0; + $$self{'rows'} += $rows; + warn "*** index: flushed $rows rows\n"; + } + $$self{'flush_pipe'}->close(); + undef $$self{'flush_pipe'}; waitpid($$self{'flush_pid'}, 0); } @@ -203,6 +225,13 @@ sub _get_symbol { sub DESTROY { my ($self) = @_; + if ($$self{'dbh'} and ($$self{'dbh_pid'} != $$)) { + # Don't flush or disconnect inherited db handle. + $$self{'dbh'}->{InactiveDestroy} = 1; + undef $$self{'dbh'}; + return; + } + if ($$self{'writes'} > 0) { $self->flush(); $self->_flush_wait(); |