Trasformazione processi in thread con semafori per ottimizzazione utilizzo DB

This commit is contained in:
cmaffio
2016-03-24 16:19:49 +01:00
parent a0fa2e684a
commit d1a974172e

View File

@@ -3,12 +3,17 @@ $|=1;
use DBI; use DBI;
use FindBin qw($Bin); use FindBin qw($Bin);
use threads;
use threads::shared;
use Thread::Semaphore;
my $param = shift @ARGV; my $param = shift @ARGV;
our $verbose = 0; our $verbose = 0;
if ($param eq '-v') { if ($param eq '-v') {
$verbose = 1; $verbose = 1;
} elsif ($param eq '-vv') {
$verbose = 2;
} }
require "$Bin/conntrack.conf"; require "$Bin/conntrack.conf";
@@ -22,6 +27,7 @@ $sts->execute ();
$query = " SELECT $query = " SELECT
id, id,
ip, ip,
nome,
port port
FROM FROM
proxy_conn proxy_conn
@@ -31,33 +37,35 @@ $query = " SELECT
$sts = $dbmysql->prepare($query); $sts = $dbmysql->prepare($query);
$sts->execute (); $sts->execute ();
$queryDB = "";
share ($queryDB);
$sem1 = Thread::Semaphore->new(1);
$sem2 = Thread::Semaphore->new(0);
my %proc;
# Generazione figli per DB
$proc{'database'} = threads->create(\&database);
# Generazione figli per cattura pacchetti
while ( $ref = $sts->fetchrow_hashref ) { while ( $ref = $sts->fetchrow_hashref ) {
my $id = $$ref{'id'}; my $id = $$ref{'id'};
my $ip = $$ref{'ip'}; my $ip = $$ref{'ip'};
my $port = $$ref{'port'}; my $port = $$ref{'port'};
my $nome = $$ref{'nome'};
$proc{$nome} = threads->create(\&conntrack, $id, $ip, $port);
if (!fork) {
while (1) {
my $pidi = fork;
if (!$pidi) {
conntrack ($id, $ip, $port);
exit;
} else {
sleep 15;
waitpid ($pidi, WNOHANG);
print "Agente $id morto - $ipid -\n" if ($verbose);
}
}
}
} }
if ($verbose) { while (1) {
while (1) { sleep 1000; } sleep 30;
} while (($variabile, $puntatore) = each %proc) {
exit 1; if(!$puntatore or !$puntatore->is_running ) {
print "Rilevato termine processo $variabile, riavvio" if ($verbose);
$puntatore->detach();
$proc{$variabile} = threads->new(\&$variabile);
}
}
}
exit;
sub conntrack { sub conntrack {
my $sess_id = shift; my $sess_id = shift;
@@ -66,15 +74,10 @@ sub conntrack {
print "Attivato agente $sess_id su $proxy_IP:$proxy_PORT\n" if ($verbose); print "Attivato agente $sess_id su $proxy_IP:$proxy_PORT\n" if ($verbose);
my $dbmysql = DBI->connect("DBI:mysql:;host=$DBhost", $DBuser, $DBpass) or die ($DBI::errstr);
$sts = $dbmysql->prepare("use $DBname");
$sts->execute ();
open CT, "/usr/sbin/conntrack -E -eNEW,DESTROY -otimestamp,id -p tcp -d $proxy_IP --dport $proxy_PORT |" or die "non va\n"; open CT, "/usr/sbin/conntrack -E -eNEW,DESTROY -otimestamp,id -p tcp -d $proxy_IP --dport $proxy_PORT |" or die "non va\n";
while (<CT>) { while (<CT>) {
my $riga = $_; my $riga = $_;
# print "$riga";
if ( $riga =~ /^\s*\[(\d+)\.\d+\]\s+\[NEW\] tcp\s+\d+ \d+ \S*\s*src=(\S+) dst=(\S+) sport=\d+ dport=(\d+)\s+\[\w+\] src=\S+ dst=\S+ sport=\d+ dport=\d+ id=(\d+)/ ) { if ( $riga =~ /^\s*\[(\d+)\.\d+\]\s+\[NEW\] tcp\s+\d+ \d+ \S*\s*src=(\S+) dst=(\S+) sport=\d+ dport=(\d+)\s+\[\w+\] src=\S+ dst=\S+ sport=\d+ dport=\d+ id=(\d+)/ ) {
my $remote_START = $1; my $remote_START = $1;
@@ -90,11 +93,10 @@ sub conntrack {
('$remote_IP', $thread_ID, FROM_UNIXTIME($remote_START)) ('$remote_IP', $thread_ID, FROM_UNIXTIME($remote_START))
"; ";
# print "$query\n"; $sem1->down();
$sts = $dbmysql->prepare($query); $queryDB = $query;
$sts->execute (); $sem2->up();
print "Invio query INSERT\n" if ($verbose > 1);
# print "$riga";
} }
if ( $riga =~ /^\s*\[(\d+)\.\d+\]\s+\[DESTROY\] tcp\s+\d+ src=(\S+) dst=(\S+) sport=\d+ dport=(\d+) packets=\d+ bytes=(\d+) src=\S+ dst=\S+ sport=\d+ dport=\d+ packets=\d+ bytes=(\d+) \[\w+\] id=(\d+)/) { if ( $riga =~ /^\s*\[(\d+)\.\d+\]\s+\[DESTROY\] tcp\s+\d+ src=(\S+) dst=(\S+) sport=\d+ dport=(\d+) packets=\d+ bytes=(\d+) src=\S+ dst=\S+ sport=\d+ dport=\d+ packets=\d+ bytes=(\d+) \[\w+\] id=(\d+)/) {
@@ -125,12 +127,30 @@ sub conntrack {
data_destroy IS NULL data_destroy IS NULL
"; ";
# print "$query\n"; $sem1->down();
$queryDB = $query;
$sts = $dbmysql->prepare($query); $sem2->up();
$sts->execute (); print "Invio query UPDATE\n" if ($verbose > 1);
} }
} }
close CT; close CT;
} }
sub database {
print "Attivato thread DB\n" if ($verbose);
my $dbmysql = DBI->connect("DBI:mysql:;host=$DBhost", $DBuser, $DBpass) or die ($DBI::errstr);
$sts = $dbmysql->prepare("use $DBname");
$sts->execute ();
while (1) {
print "DB in attesa\n" if ($verbose > 1);
$sem2->down();
print "Query arrivata\n" if ($verbose > 1);
my $sts = $dbmysql->prepare($queryDB);
$sts->execute () || die;
$sts->finish;
$sem1->up();
}
}