aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArne Georg Gleditsch <argggh@lxr.linpro.no>2007-11-29 09:46:52 +0100
committerArne Georg Gleditsch <argggh@lxr.linpro.no>2007-11-29 09:46:52 +0100
commitf8a02b3594a655aefb592e4a94ba2e348304e47d (patch)
tree498a5630f6a98b3973db24a2ac070412cb63f3f9
parent04b972005468db794529a732b764306c6d70fe97 (diff)
Store caches as preallocated plain-string buffers, to avoid the overhead and unpleasant memory allocation patterns that arrays entail.
-rw-r--r--lib/LXRng/Index/PgBatch.pm179
1 files changed, 142 insertions, 37 deletions
diff --git a/lib/LXRng/Index/PgBatch.pm b/lib/LXRng/Index/PgBatch.pm
index 3b4e446..006d7b6 100644
--- a/lib/LXRng/Index/PgBatch.pm
+++ b/lib/LXRng/Index/PgBatch.pm
@@ -44,7 +44,7 @@ sub new {
my $self = $class->SUPER::new(@args);
$$self{'writes'} = 0;
- $$self{'rows'} = 0;
+ $$self{'blocks'} = 0;
return $self;
}
@@ -72,33 +72,42 @@ sub flush {
$$self{'dbh'}->{InactiveDestroy} = 1 if $$self{'dbh'};
undef $$self{'cache'};
undef $$self{'dbh'};
- foreach my $table (qw(symbols identifiers usage includes)) {
- if (exists($$cache{$table})) {
- $self->dbh->do(qq{copy $pre$table from stdin});
- foreach my $l (@{$$cache{$table}}) {
- $i++;
- $self->dbh->pg_putline($l);
- }
- $self->dbh->pg_endcopy;
+ # Table list must be ordered wrt foreign constraints.
+ foreach my $table (qw(symbols identifiers usage
+ includes filereleases))
+ {
+ next unless exists $$cache{$table};
+ my $idx = 0;
+ my $len = $$self{'cache_idx'}{$table};
+ next unless $len > 0;
+
+ $self->dbh->do(qq{copy $pre$table from stdin});
+ while ($len > 0) {
+ $i++;
+ $self->dbh->pg_putline(substr($$cache{$table}, $idx,
+ $len > 4096 ? 4096 : $len));
+ $idx += 4096;
+ $len -= 4096;
}
+ $self->dbh->pg_endcopy;
}
$self->dbh->commit() unless $self->dbh->{AutoCommit};
- # Analyze after first 50k rows, then for every 3M row.
+ # Analyze after first 1k blocks, then for every 1M block.
$self->dbh->do(q(analyze)) if
- (($$self{'rows'} % 3000000) + $i > 3000000) or
- (($$self{'rows'} < 50000) and ($$self{'rows'} + $i > 50000));
+ (($$self{'blocks'} % 1000000) + $i > 1000000) or
+ (($$self{'blocks'} < 1000) and ($$self{'blocks'} + $i > 1000));
$self->dbh->disconnect();
print("$i\n");
close(STDOUT);
kill(9, $$);
}
- $$self{'flush_pid'} = $pid;
- foreach my $table (%{$$self{'cache'}}) {
- @{$$self{'cache'}{$table}} = ();
+
+ foreach my $table (keys %{$$self{'cache_idx'}}) {
+ $$self{'cache_idx'}{$table} = 0;
}
- %{$$self{'cache'}} = ();
- delete($$self{'cache'});
+
+ $$self{'flush_pid'} = $pid;
warn "*** index: flushing in background\n";
}
@@ -110,22 +119,32 @@ sub _flush_wait {
warn "*** index: waiting for running flush to complete...\n";
$self->dbh->commit() unless $self->dbh->{AutoCommit};
- my $rows;
- if (sysread($$self{'flush_pipe'}, $rows, 1024) > 0) {
- $rows += 0;
- $$self{'rows'} += $rows;
- warn "*** index: flushed $rows rows\n";
+ my $blocks;
+ if (sysread($$self{'flush_pipe'}, $blocks, 1024) > 0) {
+ $blocks += 0;
+ $$self{'blocks'} += $blocks;
+ warn "*** index: flushed $blocks blocks\n";
}
$$self{'flush_pipe'}->close();
undef $$self{'flush_pipe'};
waitpid($$self{'flush_pid'}, 0);
}
-sub _cache {
- my ($self, $name) = @_;
+sub _add_cached {
+ my ($self, $name, $line) = @_;
+
+ unless ($$self{'cache'}{$name}) {
+ $$self{'cache'}{$name} = "\0" x 1_000_000;
+ $$self{'cache_idx'}{$name} = 0;
+ }
- $$self{'cache'}{$name} ||= [];
- return $$self{'cache'}{$name};
+ $self->flush() if
+ $$self{'cache_idx'}{$name} + length($line) >
+ length($$self{'cache'}{$name});
+
+ substr($$self{'cache'}{$name}, $$self{'cache_idx'}{$name},
+ length($line), $line);
+ $$self{'cache_idx'}{$name} += length($line);
}
sub _cached_seqno {
@@ -153,7 +172,7 @@ sub _cached_seqno {
sub _add_include {
my ($self, $file_id, $inc_id) = @_;
- push(@{$self->_cache('includes')}, "$file_id\t$inc_id\n");
+ $self->_add_cached('includes', "$file_id\t$inc_id\n");
return 1;
}
@@ -168,7 +187,7 @@ sub _prime_symbol_cache {
$sth->execute();
my %cache;
while (my ($name, $id) = $sth->fetchrow_array()) {
- $cache{$name} = $id;
+ $cache{$name} = 0+$id;
}
$sth->finish;
@@ -178,8 +197,8 @@ sub _prime_symbol_cache {
sub _add_usage {
my ($self, $file_id, $symbol_id, $lines) = @_;
- push(@{$self->_cache('usage')},
- "$file_id\t$symbol_id\t\{".join(",", @$lines)."}\n");
+ $self->_add_cached('usage',
+ "$file_id\t$symbol_id\t\{".join(",", @$lines)."}\n");
return 1;
}
@@ -188,12 +207,12 @@ sub _add_symbol {
my ($self, $symbol) = @_;
my $id = $self->_cached_seqno('symnum');
- push(@{$self->_cache('symbols')}, "$id\t$symbol\n");
+ $self->_add_cached('symbols', "$id\t$symbol\n");
$self->_prime_symbol_cache()
unless exists $$self{'__symbol_cache'};
- $$self{'__symbol_cache'}{$symbol} = $id;
+ $$self{'__symbol_cache'}{$symbol} = 0+$id;
return $id;
}
@@ -204,9 +223,11 @@ sub _add_ident {
$ctx_id = '\\N' unless defined($ctx_id);
my $id = $self->_cached_seqno('identnum');
- push(@{$self->_cache('identifiers')}, join("\t", $id, $sym_id,
- $rfile_id, $line, $type,
- $ctx_id)."\n");
+
+ $self->_add_cached('identifiers',
+ join("\t", $id, $sym_id,
+ $rfile_id, $line, $type,
+ $ctx_id)."\n");
return $id;
}
@@ -240,7 +261,7 @@ sub _prime_fileid_cache {
$sth->execute();
my %cache;
while (my ($name, $id) = $sth->fetchrow_array()) {
- $cache{$name} = $id;
+ $cache{$name} = 0+$id;
}
$sth->finish;
@@ -254,7 +275,7 @@ sub _add_file {
$self->_prime_fileid_cache()
unless exists $$self{'__fileid_cache'};
- $$self{'__fileid_cache'}{$path} = $id;
+ $$self{'__fileid_cache'}{$path} = 0+$id;
return $id;
}
@@ -276,6 +297,90 @@ sub _get_file {
return undef;
}
+sub _add_filerelease {
+ my ($self, $rfile_id, $rel_id) = @_;
+
+ $self->_add_cached('filereleases', "$rfile_id\t$rel_id\n");
+
+ return 1;
+}
+
+sub _get_rfile {
+ my ($self, $file_id, $revision) = @_;
+
+ my $key = "$file_id\t$revision";
+ if (exists($$self{'__revision_epoch_cache'}{$key})) {
+ my ($id, $epoch) = split(/\t/, $$self{'__revision_epoch_cache'}{$key});
+ return ($id, $epoch);
+ }
+
+ my ($id, $epoch) = $self->SUPER::_get_rfile($file_id, $revision);
+ if ($id > 0 and $epoch > 0) {
+ $$self{'__revision_epoch_cache'}{$key} = "$id\t$epoch";
+ $$self{'__revision_id_cache'}{$id} = $key;
+ }
+ return ($id, $epoch);
+}
+
+
+sub _add_rfile {
+ my ($self, $file_id, $revision, $time) = @_;
+
+ my $id = $self->SUPER::_add_rfile($file_id, $revision, $time);
+ my ($epoch, $zone) = $time =~ /^(\d+)(?: ([-+]\d\d\d\d)|)$/;
+
+ my $key = "$file_id\t$revision";
+ $$self{'__revision_epoch_cache'}{$key} = "$id\t$epoch";
+ $$self{'__revision_id_cache'}{$id} = $key;
+
+ return $id;
+}
+
+sub _update_rfile_timestamp {
+ my ($self, $rfile_id, $time) = @_;
+
+ if (exists $$self{'__revision_id_cache'}{$rfile_id}) {
+ my $key = $$self{'__revision_id_cache'}{$rfile_id};
+ my ($epoch, $zone) = $time =~ /^(\d+)(?: ([-+]\d\d\d\d)|)$/;
+ $$self{'__revision_epoch_cache'}{$key} = "$rfile_id\t$epoch";
+ }
+
+ return $self->SUPER::_update_rfile_timestamp($rfile_id, $time);
+}
+
+sub _to_task {
+ my ($self, $rfile_id, $task) = @_;
+
+ my @tasks = qw(indexed referenced hashed);
+ unless (exists $$self{'__filestat_cache'}) {
+ my $tasks = join('||', map {
+ qq{(case when $_ then '1' else '0' end)} } @tasks);
+ my $dbh = $self->dbh;
+ my $pre = $self->prefix;
+ my $sth = $$self{'sth'}{'_prime_filestat_cache'} ||=
+ $dbh->prepare(qq{select id_rfile, 1||$tasks
+ from ${pre}filestatus});
+ $sth->execute();
+ my @cache;
+ while (my ($id, $stats) = $sth->fetchrow_array()) {
+ $cache[$id] = 0+$stats;
+ }
+ $sth->finish;
+
+ $$self{'__filestat_cache'} = \@cache;
+ }
+
+ if (exists $$self{'__filestat_cache'}[$rfile_id]) {
+ my %stat;
+ my $flags = $$self{'__filestat_cache'}[$rfile_id];
+ @stat{'',@tasks} = split(//, $flags);
+
+ return 0 if $stat{$task};
+ }
+
+ return $self->SUPER::_to_task($rfile_id, $task);
+}
+
sub DESTROY {
my ($self) = @_;