Шпаргалка по многопоточности :: Cетевой уголок Majestio

Шпаргалка по многопоточности


На одном из форумов я обсуждал вопрос производительность БД в многопоточном способе взаимодействия. Для замеров мне пришлось написать некоторого рода тест производительности. Захотелось это реализовать на языке Perl. Для шаблона построения подобных программ - данный код вполне пригоден. Оставлю его и тут:

#!/usr/bin/perl -w
 
# Подключаем библиотеки
use Time::HiRes qw(gettimeofday);
use threads;
use threads::shared;
use DBI;
 
# список создаваемых URL
@URLs = ();
@Req = ();
# реквизиты БД
$DBName   = "testo";
$UserName = "pgsql";
$DBHost   = "192.168.1.47";
$DBPort   = "5432";
# счетчики замеров
$TimeRaw  = 0;
$TimeId   = 0;
$SizeRaw  = 0;
$SizeId   = 0;

#################################################################################################
$|++; if ($^O =~ /win/i) { system("cls"); } else { system("clear"); }
print "\nПоехали тестировать ==============================================================\n\n";
#################################################################################################
 
# 1.Генерируем 1000 URL'ов произвольной длины в диапазоне 12...128
GenerateUrls(\@URLs);
# 2.Гененируем таблицу запросов (индексы из @URLs)
GenerateReq(\@Req);
# 3.Чистим таблицы и записываем "каталог" URLs в БД
PrepareTables(\@URLs);
# 4.Запускаем 10 потоков за запись "сырых" логов и засекаем время их исполнения
ThreadRawFunc();
# 5.Вызываем вакуум и вычисляем размер полученной таблицы LogsRaw
$SizeRaw = GatheringStats("LogsRaw");
# 6.Чистим таблицы и записываем "каталог" URLs в БД
PrepareTables(\@URLs);
# 7.Запускаем 10 потоков за запись "нормализованных" логов
ThreadIdFunc();
# 8.Вызываем вакуум и вычисляем размер полученной таблицы LogsId
$SizeId = GatheringStats("LogsId");
# 9.Отчет
print "\n Время записи RAW-логов: $TimeRaw сек (полученный размер: $SizeRaw Mb)".
      "\n Время записи \"нормализованных\"-логов: $TimeId сек (полученный размер: $SizeId Mb)\n\n";
 
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 
 
sub GenerateUrls {
  print " * Генерируем URL'ы ... ";
  my $Array = shift;
  srand();
  for my $I (1..1000) {
    my $S = "";
    for my $J (0..rand(244)+11) {    
      $S .= chr(ord('a')+rand(ord('z')-ord('a')));
    }
    push @{$Array}, $S;
  }
  print "Ok\n";
}

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 
 
sub GenerateReq {
  print " * Генерируем произвольный массив индексов запросов URLs ... ";
  my $Array = shift;
  srand();
  for my $I (1..100000) {
    push @{$Array}, int(rand(999));
  }
  print "Ok\n";
}

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 
 
sub PrepareTables {
  my $Array = shift;
  my $Dbh = DBI->connect("dbi:Pg:dbname=$DBName;host=$DBHost;port=$DBPort","$UserName","",{PrintError => 0});
  die "$DBI::errstr\n" if (defined($DBI::err));
  # -- чистим таблицу URL
  print "\n * Подготавливаем БД:\n";
  print "   - чистим таблицу URLs ... ";
  my $query = "DELETE FROM \"URLs\"";
  my $sth = $Dbh->prepare($query);
  my $rv = $sth->execute();
  die "Error query on \"$query\": " . $Dbh->errstr . "\n" unless(defined $rv);
  print "Ok\n";
  # -- чистим таблицу LogsRaw
  print "   - чистим таблицу LogRaw ... ";
  $query = "DELETE FROM \"LogsRaw\"";
  $sth = $Dbh->prepare($query);
  $rv = $sth->execute();
  die "Error query on \"$query\": " . $Dbh->errstr . "\n" unless(defined $rv);
  print "Ok\n";
  # -- чистим таблицу LogsId
  print "   - чистим таблицу LogId ... ";
  $query = "DELETE FROM \"LogsId\"";
  $sth = $Dbh->prepare($query);
  $rv = $sth->execute();
  die "Error query on \"$query\": " . $Dbh->errstr . "\n" unless(defined $rv);
  print "Ok\n";
  # -- сбрасываем счетчики Id таблиц
  print "   - сбрасываем счетчики Id таблиц ... ";
  $query = "ALTER SEQUENCE public.\"URLs_Id_seq\" INCREMENT 1 MINVALUE 1 MAXVALUE 9223372036854775807 START 1 RESTART 1 CACHE 1 NO CYCLE";
  $sth = $Dbh->prepare($query);
  $rv = $sth->execute();
  die "Error query on \"$query\": " . $Dbh->errstr . "\n" unless(defined $rv);
  $query = "ALTER SEQUENCE public.\"LogsId_Id_seq\" INCREMENT 1 MINVALUE 1 MAXVALUE 9223372036854775807 START 1 RESTART 1 CACHE 1 NO CYCLE";
  $sth = $Dbh->prepare($query);
  $rv = $sth->execute();
  die "Error query on \"$query\": " . $Dbh->errstr . "\n" unless(defined $rv);
  $query = "ALTER SEQUENCE public.\"Logs_Id_seq\" INCREMENT 1 MINVALUE 1 MAXVALUE 9223372036854775807 START 1 RESTART 1 CACHE 1 NO CYCLE";
  $sth = $Dbh->prepare($query);
  $rv = $sth->execute();
  die "Error query on \"$query\": " . $Dbh->errstr . "\n" unless(defined $rv);
  print "Ok\n";
  # -- вакуум
  print "   - вакуум + сбор статистики ... ";
  $query = "VACUUM FULL ANALYZE";
  $sth = $Dbh->prepare($query);
  $rv = $sth->execute();
  die "Error query on \"$query\": " . $Dbh->errstr . "\n" unless(defined $rv);
  print "Ok\n";
  # -- заполняем таблицу URLs
  print "   - запоняем таблицу URLs ... ";
  $query = "INSERT INTO \"URLs\" (\"URL\") VALUES ";
  for(my $I=0; $I<scalar(@{$Array}); $I++) {
    $query.="('".${$Array}[$I]."')";
    $query.= ", " if ($I+1<scalar(@{$Array}));  
  }
  $sth = $Dbh->prepare($query);
  $rv = $sth->execute();
  die "Error query on \"$query\": " . $Dbh->errstr . "\n" unless(defined $rv);
  print "Ok\n";
  # -- переиндексируем таблицу URL
  print "   - переиндексируем таблицу URLs ... ";
  $query = "REINDEX TABLE \"URLs\"";
  $sth = $Dbh->prepare($query);
  $rv = $sth->execute();
  die "Error query on \"$query\": " . $Dbh->errstr . "\n" unless(defined $rv);
  $Dbh->disconnect();
  print "Ок\n   - пауза 30 сек ... ";
  sleep(30);
  print "Ok\n\n";
}

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 
 
sub ThreadRawFunc {
  print " * Записываем LogsRaw ";
  my @Threads;
  my $Threads=10;
  # Создаём нужное количество потоков
  for my $T (1..$Threads) {
    push @Threads, threads->create(\&WriteLogsRaw, $T);
  }
  # Дожидаемся окончания работы всех потоков
  my $Start = gettimeofday;
  foreach my $T (@Threads) {
    $T->join();
  }
  $TimeRaw = gettimeofday() - $Start;
  print " Ok\n";
}

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 
 
sub ThreadIdFunc {
  print " * Записываем LogsId ";
  my @Threads;
  my $Threads=10;
  # Создаём нужное количество потоков
  for my $T (1..$Threads) {
    push @Threads, threads->create(\&WriteLogsId, $T);
  }
  # Дожидаемся окончания работы всех потоков
  my $Start = gettimeofday;
  foreach my $T (@Threads) {
    $T->join();
  }
  $TimeId = gettimeofday() - $Start;
  print " Ok\n";
}

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 
 
sub WriteLogsRaw {
  print ".";
  my $Dbh = DBI->connect("dbi:Pg:dbname=$DBName;host=$DBHost;port=$DBPort","$UserName","",{PrintError => 0});
  die "$DBI::errstr\n" if (defined($DBI::err));
  foreach my $I (@{Req}) {
    my $query = "INSERT INTO \"LogsRaw\" (\"URL\",\"Timestamp\") VALUES ('".$URLs[$I]."', NOW())";
    my $sth = $Dbh->prepare($query);
    my $rv = $sth->execute();
    die "Error query on \"$query\": " . $Dbh->errstr . "\n" unless(defined $rv);
  }
  $Dbh->disconnect();
}

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 
 
sub WriteLogsId {
  print ".";
  my $Dbh = DBI->connect("dbi:Pg:dbname=$DBName;host=$DBHost;port=$DBPort","$UserName","",{PrintError => 0});
  die "$DBI::errstr\n" if (defined($DBI::err));
  foreach my $I (@{Req}) {
    my $query = "INSERT INTO \"LogsId\" (\"IdUrl\",\"Timestamp\") ".
                "VALUES ((SELECT u.\"Id\" FROM \"URLs\" AS u WHERE u.\"URL\" = '".$URLs[$I]."'), NOW())";
    my $sth = $Dbh->prepare($query);
    my $rv = $sth->execute();
    die "Error query on \"$query\": " . $Dbh->errstr . "\n" unless(defined $rv);
  }
  $Dbh->disconnect();
}

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 
 
sub GatheringStats {
  my $Name = shift;
  print " * Производим вакуум и вычисляем размер таблицы \"".$Name."\" ... ";
  my $Dbh = DBI->connect("dbi:Pg:dbname=$DBName;host=$DBHost;port=$DBPort","$UserName","",{PrintError => 0});
  die "$DBI::errstr\n" if (defined($DBI::err));
  my $query = "VACUUM FULL ANALYZE";
  my $sth = $Dbh->prepare($query);
  my $rv = $sth->execute();
  die "Error query on \"$query\": " . $Dbh->errstr . "\n" unless(defined $rv);
  $query = "SELECT (relpages * 8192 / (1024*1024))::int as size_mb FROM pg_class WHERE relname LIKE '".$Name."'";
  $sth = $Dbh->prepare($query);
  $rv = $sth->execute();
  die "Error query on \"$query\": " . $Dbh->errstr . "\n" unless(defined $rv);
  my @array = $sth->fetchrow_array();
  $sth->finish();
  $Dbh->disconnect();
  print " Ok\n";
  return $array[0];
}
Рейтинг: 5/5 - 2 голосов