diff --git a/proxy/conntrack.pl b/proxy/conntrack.pl index f87460d..96bd9ff 100755 --- a/proxy/conntrack.pl +++ b/proxy/conntrack.pl @@ -3,12 +3,17 @@ $|=1; use DBI; use FindBin qw($Bin); +use threads; +use threads::shared; +use Thread::Semaphore; my $param = shift @ARGV; our $verbose = 0; if ($param eq '-v') { $verbose = 1; +} elsif ($param eq '-vv') { + $verbose = 2; } require "$Bin/conntrack.conf"; @@ -21,7 +26,8 @@ $sts->execute (); $query = " SELECT id, - ip, + ip, + nome, port FROM proxy_conn @@ -31,33 +37,35 @@ $query = " SELECT $sts = $dbmysql->prepare($query); $sts->execute (); +$queryDB = ""; +share ($queryDB); +$sem1 = Thread::Semaphore->new(1); +$sem2 = Thread::Semaphore->new(0); + +my %proc; +# Generazione figli per DB +$proc{'database'} = threads->create(\&database); +# Generazione figli per cattura pacchetti while ( $ref = $sts->fetchrow_hashref ) { my $id = $$ref{'id'}; my $ip = $$ref{'ip'}; my $port = $$ref{'port'}; + my $nome = $$ref{'nome'}; + $proc{$nome} = threads->create(\&conntrack, $id, $ip, $port); +} - - if (!fork) { - - while (1) { - my $pidi = fork; - if (!$pidi) { - conntrack ($id, $ip, $port); - exit; - } else { - sleep 15; - waitpid ($pidi, WNOHANG); - print "Agente $id morto - $ipid -\n" if ($verbose); - } +while (1) { + sleep 30; + while (($variabile, $puntatore) = each %proc) { + if(!$puntatore or !$puntatore->is_running ) { + print "Rilevato termine processo $variabile, riavvio" if ($verbose); + $puntatore->detach(); + $proc{$variabile} = threads->new(\&$variabile); } } -} -if ($verbose) { - while (1) { sleep 1000; } } -exit 1; - +exit; sub conntrack { my $sess_id = shift; @@ -66,15 +74,10 @@ sub conntrack { print "Attivato agente $sess_id su $proxy_IP:$proxy_PORT\n" if ($verbose); - my $dbmysql = DBI->connect("DBI:mysql:;host=$DBhost", $DBuser, $DBpass) or die ($DBI::errstr); - $sts = $dbmysql->prepare("use $DBname"); - $sts->execute (); - open CT, "/usr/sbin/conntrack -E -eNEW,DESTROY -otimestamp,id -p tcp -d $proxy_IP --dport $proxy_PORT |" or die "non va\n"; while () { my $riga = $_; -# print "$riga"; if ( $riga =~ /^\s*\[(\d+)\.\d+\]\s+\[NEW\] tcp\s+\d+ \d+ \S*\s*src=(\S+) dst=(\S+) sport=\d+ dport=(\d+)\s+\[\w+\] src=\S+ dst=\S+ sport=\d+ dport=\d+ id=(\d+)/ ) { my $remote_START = $1; @@ -90,11 +93,10 @@ sub conntrack { ('$remote_IP', $thread_ID, FROM_UNIXTIME($remote_START)) "; -# print "$query\n"; - $sts = $dbmysql->prepare($query); - $sts->execute (); - -# print "$riga"; + $sem1->down(); + $queryDB = $query; + $sem2->up(); + print "Invio query INSERT\n" if ($verbose > 1); } if ( $riga =~ /^\s*\[(\d+)\.\d+\]\s+\[DESTROY\] tcp\s+\d+ src=(\S+) dst=(\S+) sport=\d+ dport=(\d+) packets=\d+ bytes=(\d+) src=\S+ dst=\S+ sport=\d+ dport=\d+ packets=\d+ bytes=(\d+) \[\w+\] id=(\d+)/) { @@ -125,12 +127,30 @@ sub conntrack { data_destroy IS NULL "; -# print "$query\n"; - - $sts = $dbmysql->prepare($query); - $sts->execute (); + $sem1->down(); + $queryDB = $query; + $sem2->up(); + print "Invio query UPDATE\n" if ($verbose > 1); } } close CT; } +sub database { + print "Attivato thread DB\n" if ($verbose); + + my $dbmysql = DBI->connect("DBI:mysql:;host=$DBhost", $DBuser, $DBpass) or die ($DBI::errstr); + $sts = $dbmysql->prepare("use $DBname"); + $sts->execute (); + + while (1) { + print "DB in attesa\n" if ($verbose > 1); + $sem2->down(); + print "Query arrivata\n" if ($verbose > 1); + my $sts = $dbmysql->prepare($queryDB); + $sts->execute () || die; + $sts->finish; + $sem1->up(); + } + +}