aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArne Georg Gleditsch <argggh@lxr.linpro.no>2007-11-27 01:13:37 +0100
committerArne Georg Gleditsch <argggh@lxr.linpro.no>2007-11-27 01:13:37 +0100
commit828f8807dcc27d236bc9f36b57987fd5cd505f95 (patch)
tree931af80f1c098843dd7f9e824b3542348b9030e2
parent53aeb447f17030c94cd7c2048929c432a8761aff (diff)
Improve flushing logic,
-rw-r--r--lib/LXRng/Index/Pg.pm26
-rw-r--r--lib/LXRng/Index/PgBatch.pm55
2 files changed, 60 insertions, 21 deletions
diff --git a/lib/LXRng/Index/Pg.pm b/lib/LXRng/Index/Pg.pm
index a385767..7afb2a8 100644
--- a/lib/LXRng/Index/Pg.pm
+++ b/lib/LXRng/Index/Pg.pm
@@ -8,12 +8,16 @@ use base qw(LXRng::Index::DBI);
sub dbh {
my ($self) = @_;
- $$self{'dbh'} ||= DBI->connect('dbi:Pg:'.$$self{'db_spec'},
- $$self{'db_user'}, $$self{'db_pass'},
- {AutoCommit => 1,
- RaiseError => 1})
+ return $$self{'dbh'} if $$self{'dbh'};
+
+ $$self{'dbh'} = DBI->connect('dbi:Pg:'.$$self{'db_spec'},
+ $$self{'db_user'}, $$self{'db_pass'},
+ {AutoCommit => 1,
+ RaiseError => 1,
+ pg_server_prepare => 1})
or die($DBI::errstr);
-
+ $$self{'dbh_pid'} = $$;
+
return $$self{'dbh'};
}
@@ -425,9 +429,15 @@ sub DESTROY {
my ($self) = @_;
if ($$self{'dbh'}) {
- $$self{'dbh'}->rollback();
- $$self{'dbh'}->disconnect();
- delete($$self{'dbh'});
+ if ($$self{'dbh_pid'} != $$) {
+ $$self{'dbh'}->{InactiveDestroy} = 1;
+ undef $$self{'dbh'};
+ }
+ else {
+ $$self{'dbh'}->rollback();
+ $$self{'dbh'}->disconnect();
+ delete($$self{'dbh'});
+ }
}
}
diff --git a/lib/LXRng/Index/PgBatch.pm b/lib/LXRng/Index/PgBatch.pm
index 40654d2..af2c212 100644
--- a/lib/LXRng/Index/PgBatch.pm
+++ b/lib/LXRng/Index/PgBatch.pm
@@ -32,7 +32,7 @@ sub transaction {
# Only occasional synchronization if we're inside another
# transaction.
- if ($self->{'writes'}++ % 997 == 0) {
+ if ($self->{'writes'}++ % 491 == 0) {
$self->flush();
$self->dbh->commit();
}
@@ -44,6 +44,7 @@ sub new {
my $self = $class->SUPER::new(@args);
$$self{'writes'} = 0;
+ $$self{'rows'} = 0;
return $self;
}
@@ -57,19 +58,24 @@ sub flush {
my $pre = $self->prefix;
$self->dbh->commit() unless $self->dbh->{AutoCommit};
- my $pid = fork();
+ my $pid = open($$self{'flush_pipe'}, "-|");
die("fork failed: $!") unless defined($pid);
if ($pid == 0) {
$SIG{'INT'} = 'IGNORE';
$SIG{'QUIT'} = 'IGNORE';
$SIG{'TERM'} = 'IGNORE';
+ $SIG{'PIPE'} = 'IGNORE';
+ undef $$self{'flush_pipe'};
my $i = 0;
- $$self{'dbh'} = undef;
- foreach my $table (qw(symbols identifiers usage)) {
- if (exists($$self{'cache'}{$table})) {
+ my $cache = $$self{'cache'};
+ $$self{'dbh'}->{InactiveDestroy} = 1 if $$self{'dbh'};
+ undef $$self{'cache'};
+ undef $$self{'dbh'};
+ foreach my $table (qw(symbols identifiers usage includes)) {
+ if (exists($$cache{$table})) {
$self->dbh->do(qq{copy $pre$table from stdin});
- foreach my $l (@{$$self{'cache'}{$table}}) {
+ foreach my $l (@{$$cache{$table}}) {
$i++;
$self->dbh->pg_putline($l);
}
@@ -77,25 +83,41 @@ sub flush {
}
}
$self->dbh->commit() unless $self->dbh->{AutoCommit};
- $self->dbh->do(q(analyze)) if $i > 500000;
+ # Analyze after first 50k rows, then for every 3M row.
+ $self->dbh->do(q(analyze)) if
+ (($$self{'rows'} % 3000000) + $i > 3000000) or
+ (($$self{'rows'} < 50000) and ($$self{'rows'} + $i > 50000));
+
$self->dbh->disconnect();
- warn "\n*** index: flushed $i rows\n";
+ print("$i\n");
+ close(STDOUT);
kill(9, $$);
}
$$self{'flush_pid'} = $pid;
+ foreach my $table (%{$$self{'cache'}}) {
+ @{$$self{'cache'}{$table}} = ();
+ }
+ %{$$self{'cache'}} = ();
delete($$self{'cache'});
- warn "\n*** index: flushing in background\n";
+
+ warn "*** index: flushing in background\n";
}
sub _flush_wait {
my ($self) = @_;
- return unless $$self{'flush_pid'};
- waitpid($$self{'flush_pid'}, WNOHANG); # Reap zombies
- return unless kill(0, $$self{'flush_pid'});
+ return unless $$self{'flush_pipe'};
- warn "\n*** index: waiting for running flush to complete...\n";
+ warn "*** index: waiting for running flush to complete...\n";
$self->dbh->commit() unless $self->dbh->{AutoCommit};
+ my $rows;
+ if (sysread($$self{'flush_pipe'}, $rows, 1024) > 0) {
+ $rows += 0;
+ $$self{'rows'} += $rows;
+ warn "*** index: flushed $rows rows\n";
+ }
+ $$self{'flush_pipe'}->close();
+ undef $$self{'flush_pipe'};
waitpid($$self{'flush_pid'}, 0);
}
@@ -203,6 +225,13 @@ sub _get_symbol {
sub DESTROY {
my ($self) = @_;
+ if ($$self{'dbh'} and ($$self{'dbh_pid'} != $$)) {
+ # Don't flush or disconnect inherited db handle.
+ $$self{'dbh'}->{InactiveDestroy} = 1;
+ undef $$self{'dbh'};
+ return;
+ }
+
if ($$self{'writes'} > 0) {
$self->flush();
$self->_flush_wait();