diff options
author | Arne Georg Gleditsch <argggh@lxr.linpro.no> | 2007-11-29 09:46:52 +0100 |
---|---|---|
committer | Arne Georg Gleditsch <argggh@lxr.linpro.no> | 2007-11-29 09:46:52 +0100 |
commit | f8a02b3594a655aefb592e4a94ba2e348304e47d (patch) | |
tree | 498a5630f6a98b3973db24a2ac070412cb63f3f9 | |
parent | 04b972005468db794529a732b764306c6d70fe97 (diff) |
Store caches as preallocated plain-string buffers, to avoid the overhead and unpleasant memory allocation patterns that arrays entail.
-rw-r--r-- | lib/LXRng/Index/PgBatch.pm | 179 |
1 files changed, 142 insertions, 37 deletions
diff --git a/lib/LXRng/Index/PgBatch.pm b/lib/LXRng/Index/PgBatch.pm index 3b4e446..006d7b6 100644 --- a/lib/LXRng/Index/PgBatch.pm +++ b/lib/LXRng/Index/PgBatch.pm @@ -44,7 +44,7 @@ sub new { my $self = $class->SUPER::new(@args); $$self{'writes'} = 0; - $$self{'rows'} = 0; + $$self{'blocks'} = 0; return $self; } @@ -72,33 +72,42 @@ sub flush { $$self{'dbh'}->{InactiveDestroy} = 1 if $$self{'dbh'}; undef $$self{'cache'}; undef $$self{'dbh'}; - foreach my $table (qw(symbols identifiers usage includes)) { - if (exists($$cache{$table})) { - $self->dbh->do(qq{copy $pre$table from stdin}); - foreach my $l (@{$$cache{$table}}) { - $i++; - $self->dbh->pg_putline($l); - } - $self->dbh->pg_endcopy; + # Table list must be ordered wrt foreign constraints. + foreach my $table (qw(symbols identifiers usage + includes filereleases)) + { + next unless exists $$cache{$table}; + my $idx = 0; + my $len = $$self{'cache_idx'}{$table}; + next unless $len > 0; + + $self->dbh->do(qq{copy $pre$table from stdin}); + while ($len > 0) { + $i++; + $self->dbh->pg_putline(substr($$cache{$table}, $idx, + $len > 4096 ? 4096 : $len)); + $idx += 4096; + $len -= 4096; } + $self->dbh->pg_endcopy; } $self->dbh->commit() unless $self->dbh->{AutoCommit}; - # Analyze after first 50k rows, then for every 3M row. + # Analyze after first 1k blocks, then for every 1M block. $self->dbh->do(q(analyze)) if - (($$self{'rows'} % 3000000) + $i > 3000000) or - (($$self{'rows'} < 50000) and ($$self{'rows'} + $i > 50000)); + (($$self{'blocks'} % 1000000) + $i > 1000000) or + (($$self{'blocks'} < 1000) and ($$self{'blocks'} + $i > 1000)); $self->dbh->disconnect(); print("$i\n"); close(STDOUT); kill(9, $$); } - $$self{'flush_pid'} = $pid; - foreach my $table (%{$$self{'cache'}}) { - @{$$self{'cache'}{$table}} = (); + + foreach my $table (keys %{$$self{'cache_idx'}}) { + $$self{'cache_idx'}{$table} = 0; } - %{$$self{'cache'}} = (); - delete($$self{'cache'}); + + $$self{'flush_pid'} = $pid; warn "*** index: flushing in background\n"; } @@ -110,22 +119,32 @@ sub _flush_wait { warn "*** index: waiting for running flush to complete...\n"; $self->dbh->commit() unless $self->dbh->{AutoCommit}; - my $rows; - if (sysread($$self{'flush_pipe'}, $rows, 1024) > 0) { - $rows += 0; - $$self{'rows'} += $rows; - warn "*** index: flushed $rows rows\n"; + my $blocks; + if (sysread($$self{'flush_pipe'}, $blocks, 1024) > 0) { + $blocks += 0; + $$self{'blocks'} += $blocks; + warn "*** index: flushed $blocks blocks\n"; } $$self{'flush_pipe'}->close(); undef $$self{'flush_pipe'}; waitpid($$self{'flush_pid'}, 0); } -sub _cache { - my ($self, $name) = @_; +sub _add_cached { + my ($self, $name, $line) = @_; + + unless ($$self{'cache'}{$name}) { + $$self{'cache'}{$name} = "\0" x 1_000_000; + $$self{'cache_idx'}{$name} = 0; + } - $$self{'cache'}{$name} ||= []; - return $$self{'cache'}{$name}; + $self->flush() if + $$self{'cache_idx'}{$name} + length($line) > + length($$self{'cache'}{$name}); + + substr($$self{'cache'}{$name}, $$self{'cache_idx'}{$name}, + length($line), $line); + $$self{'cache_idx'}{$name} += length($line); } sub _cached_seqno { @@ -153,7 +172,7 @@ sub _cached_seqno { sub _add_include { my ($self, $file_id, $inc_id) = @_; - push(@{$self->_cache('includes')}, "$file_id\t$inc_id\n"); + $self->_add_cached('includes', "$file_id\t$inc_id\n"); return 1; } @@ -168,7 +187,7 @@ sub _prime_symbol_cache { $sth->execute(); my %cache; while (my ($name, $id) = $sth->fetchrow_array()) { - $cache{$name} = $id; + $cache{$name} = 0+$id; } $sth->finish; @@ -178,8 +197,8 @@ sub _prime_symbol_cache { sub _add_usage { my ($self, $file_id, $symbol_id, $lines) = @_; - push(@{$self->_cache('usage')}, - "$file_id\t$symbol_id\t\{".join(",", @$lines)."}\n"); + $self->_add_cached('usage', + "$file_id\t$symbol_id\t\{".join(",", @$lines)."}\n"); return 1; } @@ -188,12 +207,12 @@ sub _add_symbol { my ($self, $symbol) = @_; my $id = $self->_cached_seqno('symnum'); - push(@{$self->_cache('symbols')}, "$id\t$symbol\n"); + $self->_add_cached('symbols', "$id\t$symbol\n"); $self->_prime_symbol_cache() unless exists $$self{'__symbol_cache'}; - $$self{'__symbol_cache'}{$symbol} = $id; + $$self{'__symbol_cache'}{$symbol} = 0+$id; return $id; } @@ -204,9 +223,11 @@ sub _add_ident { $ctx_id = '\\N' unless defined($ctx_id); my $id = $self->_cached_seqno('identnum'); - push(@{$self->_cache('identifiers')}, join("\t", $id, $sym_id, - $rfile_id, $line, $type, - $ctx_id)."\n"); + + $self->_add_cached('identifiers', + join("\t", $id, $sym_id, + $rfile_id, $line, $type, + $ctx_id)."\n"); return $id; } @@ -240,7 +261,7 @@ sub _prime_fileid_cache { $sth->execute(); my %cache; while (my ($name, $id) = $sth->fetchrow_array()) { - $cache{$name} = $id; + $cache{$name} = 0+$id; } $sth->finish; @@ -254,7 +275,7 @@ sub _add_file { $self->_prime_fileid_cache() unless exists $$self{'__fileid_cache'}; - $$self{'__fileid_cache'}{$path} = $id; + $$self{'__fileid_cache'}{$path} = 0+$id; return $id; } @@ -276,6 +297,90 @@ sub _get_file { return undef; } +sub _add_filerelease { + my ($self, $rfile_id, $rel_id) = @_; + + $self->_add_cached('filereleases', "$rfile_id\t$rel_id\n"); + + return 1; +} + +sub _get_rfile { + my ($self, $file_id, $revision) = @_; + + my $key = "$file_id\t$revision"; + if (exists($$self{'__revision_epoch_cache'}{$key})) { + my ($id, $epoch) = split(/\t/, $$self{'__revision_epoch_cache'}{$key}); + return ($id, $epoch); + } + + my ($id, $epoch) = $self->SUPER::_get_rfile($file_id, $revision); + if ($id > 0 and $epoch > 0) { + $$self{'__revision_epoch_cache'}{$key} = "$id\t$epoch"; + $$self{'__revision_id_cache'}{$id} = $key; + } + return ($id, $epoch); +} + + +sub _add_rfile { + my ($self, $file_id, $revision, $time) = @_; + + my $id = $self->SUPER::_add_rfile($file_id, $revision, $time); + my ($epoch, $zone) = $time =~ /^(\d+)(?: ([-+]\d\d\d\d)|)$/; + + my $key = "$file_id\t$revision"; + $$self{'__revision_epoch_cache'}{$key} = "$id\t$epoch"; + $$self{'__revision_id_cache'}{$id} = $key; + + return $id; +} + +sub _update_rfile_timestamp { + my ($self, $rfile_id, $time) = @_; + + if (exists $$self{'__revision_id_cache'}{$rfile_id}) { + my $key = $$self{'__revision_id_cache'}{$rfile_id}; + my ($epoch, $zone) = $time =~ /^(\d+)(?: ([-+]\d\d\d\d)|)$/; + $$self{'__revision_epoch_cache'}{$key} = "$rfile_id\t$epoch"; + } + + return $self->SUPER::_update_rfile_timestamp($rfile_id, $time); +} + +sub _to_task { + my ($self, $rfile_id, $task) = @_; + + my @tasks = qw(indexed referenced hashed); + unless (exists $$self{'__filestat_cache'}) { + my $tasks = join('||', map { + qq{(case when $_ then '1' else '0' end)} } @tasks); + my $dbh = $self->dbh; + my $pre = $self->prefix; + my $sth = $$self{'sth'}{'_prime_filestat_cache'} ||= + $dbh->prepare(qq{select id_rfile, 1||$tasks + from ${pre}filestatus}); + $sth->execute(); + my @cache; + while (my ($id, $stats) = $sth->fetchrow_array()) { + $cache[$id] = 0+$stats; + } + $sth->finish; + + $$self{'__filestat_cache'} = \@cache; + } + + if (exists $$self{'__filestat_cache'}[$rfile_id]) { + my %stat; + my $flags = $$self{'__filestat_cache'}[$rfile_id]; + @stat{'',@tasks} = split(//, $flags); + + return 0 if $stat{$task}; + } + + return $self->SUPER::_to_task($rfile_id, $task); +} + sub DESTROY { my ($self) = @_; |