#!/usr/bin/perl $|=1; use DBI; use FindBin qw($Bin); use threads; use threads::shared; use Thread::Semaphore; my $param = shift @ARGV; our $verbose = 0; if ($param eq '-v') { $verbose = 1; } elsif ($param eq '-vv') { $verbose = 2; } require "$Bin/conntrack.conf"; `sysctl -w net.netfilter.nf_conntrack_acct=1`; my $dbmysql = DBI->connect("DBI:mysql:;host=$DBhost", $DBuser, $DBpass) or die ($DBI::errstr); $sts = $dbmysql->prepare("use $DBname"); $sts->execute (); $query = "SELECT ip FROM proxy_pool GROUP BY ip"; $sts = $dbmysql->prepare($query); $sts->execute (); our @listaip = (); while ( $ref = $sts->fetchrow_hashref ) { push @listaip, $$ref{'ip'}; } $query = " SELECT id, ip, nome, port FROM proxy_conn WHERE attivo = 1 "; $sts = $dbmysql->prepare($query); $sts->execute (); $queryDB = ""; share ($queryDB); $sem1 = Thread::Semaphore->new(1); $sem2 = Thread::Semaphore->new(0); my %proc; # Generazione figli per DB $proc{'database'} = threads->create(\&database); # Generazione figli per cattura pacchetti while ( $ref = $sts->fetchrow_hashref ) { my $id = $$ref{'id'}; my $ip = $$ref{'ip'}; my $port = $$ref{'port'}; my $nome = $$ref{'nome'}; $proc{$nome} = threads->create(\&conntrack, $id, $ip, $port); print "Attivato processo $nome\n" if ($verbose); } while (1) { sleep 30; while (my ($variabile, $puntatore) = each %proc) { if(!$puntatore or !$puntatore->is_running ) { print "Rilevato termine processo $variabile\n" if ($verbose); #$puntatore->detach(); if ($variabile eq "database") { $proc{'database'} = threads->create(\&database); } else { $query = "SELECT id, ip, port FROM proxy_conn WHERE nome = '$variabile'"; $sts = $dbmysql->prepare($query); $sts->execute (); $ref = $sts->fetchrow_hashref; my $id = $$ref{'id'}; my $ip = $$ref{'ip'}; my $port = $$ref{'port'}; $proc{$$variabile} = threads->create(\&conntrack, $id, $ip, $port); } } } } exit; sub conntrack { my $sess_id = shift; my $proxy_IP = shift; my $proxy_PORT = shift; print "Attivato agente $sess_id su $proxy_IP:$proxy_PORT\n" if ($verbose); open CT, "/usr/sbin/conntrack -E -eNEW,DESTROY -otimestamp,id -p tcp -d $proxy_IP --dport $proxy_PORT |" or die "non va\n"; while () { my $riga = $_; if ( $riga =~ /^\s*\[(\d+)\.\d+\]\s+\[NEW\] tcp\s+\d+ \d+ \S*\s*src=(\S+) dst=(\S+) sport=\d+ dport=(\d+)\s+\[\w+\] src=\S+ dst=\S+ sport=\d+ dport=\d+ id=(\d+)/ ) { my $remote_START = $1; my $remote_IP = $2; my $local_IP = $3; my $local_PORT = $4; my $thread_ID = $5; my $query = " INSERT INTO proxy_dati (ip, thread, data_new) VALUES ('$remote_IP', $thread_ID, FROM_UNIXTIME($remote_START)) "; $sem1->down(); $queryDB = $query; $sem2->up(); print "Invio query INSERT\n" if ($verbose > 1); if (!grep( /^$remote_IP$/, @listaip )) { $sem1->down(); $queryDB = "INSERT INTO proxy_pool (ip,pool,attivo,ins) VALUE ('$remote_IP', 1, 0,now())"; $sem2->up(); push @listaip, $remote_IP; print "Invio query INSERT in proxy_pool\n" if ($verbose > 1); } } if ( $riga =~ /^\s*\[(\d+)\.\d+\]\s+\[DESTROY\] tcp\s+\d+ src=(\S+) dst=(\S+) sport=\d+ dport=(\d+) packets=\d+ bytes=(\d+) src=\S+ dst=\S+ sport=\d+ dport=\d+ packets=\d+ bytes=(\d+) \[\w+\] id=(\d+)/) { my $remote_DESTROY = $1; my $remote_IP = $2; my $local_IP = $3; my $local_PORT = $4; my $remote_SEND = $5; my $remote_RECEIVE = $6; my $thread_ID = $7; printf ("$sess_id - IP: %-15s - ID: %-12s - Inviati: %10d - Ricevuti: %10d\n", $remote_IP, $thread_ID, $remote_SEND, $remote_RECEIVE) if ($verbose); my $query = " UPDATE proxy_dati SET data_destroy = FROM_UNIXTIME($remote_DESTROY), inviati = $remote_SEND, ricevuti = $remote_RECEIVE, tempo = TIMESTAMPDIFF(SECOND, data_new, FROM_UNIXTIME($remote_DESTROY)), vel_inv = $remote_SEND / TIMESTAMPDIFF(SECOND, data_new, FROM_UNIXTIME($remote_DESTROY)), vel_ric = $remote_RECEIVE / TIMESTAMPDIFF(SECOND, data_new, FROM_UNIXTIME($remote_DESTROY)) WHERE thread = $thread_ID AND ip = '$remote_IP' AND data_destroy = '1970-01-01 00:00:00' "; $sem1->down(); $queryDB = $query; $sem2->up(); print "Invio query UPDATE\n" if ($verbose > 1); } } close CT; } sub database { print "Attivato thread DB\n" if ($verbose); $dbmysql = DBConn (); while (1) { print "DB in attesa\n" if ($verbose > 1); $sem2->down(); print "Query arrivata\n" if ($verbose > 1); if ( not $dbmysql->ping ) { $dbmysql = DBConn (); } my $sts = $dbmysql->prepare($queryDB); $sts->execute (); $sts->finish; $sem1->up(); } } sub DBConn { print "Connessione DB\n" if ($verbose > 1); my $dbmysql = DBI->connect("DBI:mysql:;host=$DBhost", $DBuser, $DBpass, {PrintError => 0, RaiseError => 0, AutoCommit =>1, mysql_auto_reconnect=>1} ) or die ($DBI::errstr); $sts = $dbmysql->prepare("use $DBname"); $sts->execute (); return $dbmysql; }