"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "src/tcpdemux.cpp" between
tcpflow-1.5.0.tar.gz and tcpflow-1.6.1.tar.gz

About: tcpflow is a TCP/IP packet demultiplexer that captures data transmitted as part of TCP connections (flows), and stores the data in a way that is convenient for protocol analysis and debugging.

tcpdemux.cpp  (tcpflow-1.5.0):tcpdemux.cpp  (tcpflow-1.6.1)
skipping to change at line 22 skipping to change at line 22
*/ */
#include "tcpflow.h" #include "tcpflow.h"
#include "tcpip.h" #include "tcpip.h"
#include "tcpdemux.h" #include "tcpdemux.h"
#include <iostream> #include <iostream>
#include <sstream> #include <sstream>
#include <vector> #include <vector>
#ifdef HAVE_SYS_WAIT_H
#include <sys/wait.h>
#endif
/* static */ uint32_t tcpdemux::max_saved_flows = 100; /* static */ uint32_t tcpdemux::max_saved_flows = 100;
/* static */ uint32_t tcpdemux::tcp_timeout = 0; /* static */ uint32_t tcpdemux::tcp_timeout = 0;
/* static */ int tcpdemux::tcp_subproc_max = 10;
/* static */ int tcpdemux::tcp_subproc = 0;
/* static */ int tcpdemux::tcp_alert_fd = -1;
/* static */ std::string tcpdemux::tcp_cmd = "";
tcpdemux::tcpdemux(): tcpdemux::tcpdemux():
#ifdef HAVE_SQLITE3 #ifdef HAVE_SQLITE3
db(),insert_flow(), db(),insert_flow(),
#endif #endif
flow_sorter(0),tcp_processor(0),
outdir("."),flow_counter(0),packet_counter(0), outdir("."),flow_counter(0),packet_counter(0),
xreport(0),pwriter(0),max_open_flows(),max_fds(get_max_fds()-NUM_RESERVED_FD S), xreport(0),pwriter(0),max_open_flows(),max_fds(get_max_fds()-NUM_RESERVED_FD S),
flow_map(),open_flows(),saved_flow_map(), unique_id(0),
flow_map(),open_flows(),saved_flow_map(),flow_fd_cache_map(0),
saved_flows(),start_new_connections(false),opt(),fs() saved_flows(),start_new_connections(false),opt(),fs()
{ {
tcp_processor = &tcpdemux::process_tcp;
}
void tcpdemux::alter_processing_core()
{
DEBUG(1) ("ensuring pcap core");
tcp_processor = &tcpdemux::dissect_tcp;
flow_sorter = new pcap_writer();
} }
void tcpdemux::openDB() void tcpdemux::openDB()
{ {
#ifdef HAVE_SQLITE3 #ifdef HAVE_SQLITE3
int rc = sqlite3_open("test.db", &db); int rc = sqlite3_open("test.db", &db);
if( rc ){ if( rc ){
fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db)); fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db));
db = 0; db = 0;
} }
skipping to change at line 208 skipping to change at line 226
sbuf = 0; sbuf = 0;
} }
} }
} }
tcp->close_file(); tcp->close_file();
if(xreport) tcp->dump_xml(xreport,xmladd.str()); if(xreport) tcp->dump_xml(xreport,xmladd.str());
/** /**
* Before we delete the tcp structure, save information about the saved flow * Before we delete the tcp structure, save information about the saved flow
*/ */
save_flow(tcp); save_flow(tcp);
if(opt.store_output && tcp_alert_fd>=0){
std::stringstream ss;
ss << "close\t" << tcp->flow_pathname.c_str() << "\n";
const std::string &sso = ss.str();
if(write(tcp_alert_fd,sso.c_str(),sso.size()) != (int)sso.size()){
perror("write");
}
}
if(tcp_cmd.size()>0 && tcp->flow_pathname.size()>0){
/* If we are at maximum number of subprocesses, wait for one to exit */
std::string cmd = tcp_cmd + " " + tcp->flow_pathname;
#ifdef HAVE_FORK
int status=0;
pid_t pid = 0;
while(tcp_subproc >= tcp_subproc_max){
pid = wait(&status);
tcp_subproc--;
}
/* Fork off a child */
pid = fork();
if(pid<0) die("Cannot fork child");
if(pid==0){
/* We are the child */
exit(system(cmd.c_str()));
}
tcp_subproc++;
#else
system(cmd.c_str());
#endif
}
delete tcp; delete tcp;
} }
void tcpdemux::remove_flow(const flow_addr &flow) void tcpdemux::remove_flow(const flow_addr &flow)
{ {
flow_map_t::iterator it = flow_map.find(flow); flow_map_t::iterator it = flow_map.find(flow);
if(it!=flow_map.end()){ if(it!=flow_map.end()){
post_process(it->second); post_process(it->second);
flow_map.erase(it); flow_map.erase(it);
} }
} }
void tcpdemux::remove_all_flows() void tcpdemux::remove_all_flows()
{ {
DEBUG(10) ("Cleaning up flows");
for(flow_map_t::iterator it=flow_map.begin();it!=flow_map.end();it++){ for(flow_map_t::iterator it=flow_map.begin();it!=flow_map.end();it++){
post_process(it->second); post_process(it->second);
} }
flow_map.clear(); flow_map.clear();
for(sparse_saved_flow_map_t::iterator it=flow_fd_cache_map.begin();it!=flow_
fd_cache_map.end();it++){
delete it->second;
}
flow_fd_cache_map.clear();
} }
/**************************************************************** /****************************************************************
*** tcpdemultiplexer *** tcpdemultiplexer
****************************************************************/ ****************************************************************/
/* Try to find the maximum number of FDs this system can have open */ /* Try to find the maximum number of FDs this system can have open */
unsigned int tcpdemux::get_max_fds(void) unsigned int tcpdemux::get_max_fds(void)
{ {
int max_descs = 0; int max_descs = 0;
skipping to change at line 323 skipping to change at line 381
delete flow0; // and delete the saved flow delete flow0; // and delete the saved flow
} }
/* Now save the flow */ /* Now save the flow */
saved_flow *sf = new saved_flow(tcp); saved_flow *sf = new saved_flow(tcp);
saved_flow_map[sf->addr] = sf; saved_flow_map[sf->addr] = sf;
saved_flows.push_back(sf); saved_flows.push_back(sf);
} }
/** /**
* dissect_tcp():
*
* Called to process tcp pkts in a way that dissected or isolated pcap flows are
* emerging afterwards. Similar notions go into the direction of "sorting" pcap
pkts
* as per flow context.
*
* Returns 0 if packet is processed, 1 if it is not processed, -1 if error
*/
int tcpdemux::dissect_tcp(const ipaddr &src, const ipaddr &dst,sa_family_t famil
y,
const u_char *ip_data, uint32_t ip_payload_len,
const be13::packet_info &pi)
{
DEBUG(6) ("dissecting pkt *********************");
struct be13::tcphdr *tcp_header = (struct be13::tcphdr *) ip_data;
flow_addr this_flow(src,dst,ntohs(tcp_header->th_sport),
ntohs(tcp_header->th_dport),family);
sparse_saved_flow_map_t::const_iterator it = flow_fd_cache_map.find(this_flo
w);
if(it!=flow_fd_cache_map.end()){
flow_sorter->update_sink(it->second->fcap);
}
else {
flow fn_gen_vehicle(this_flow, 0, pi); // impromptu flow name generator
std::string fn = fn_gen_vehicle.new_pcap_filename();
flow_sorter->refresh_sink(fn, pi.pcap_dlt);
FILE *sink= flow_sorter->yield_sink();
sparse_saved_flow *ssf = new sparse_saved_flow(this_flow, sink);
flow_fd_cache_map[ssf->addr] = ssf;
}
flow_sorter->writepkt(pi.pcap_hdr,pi.pcap_data);
return 0;
}
/**
* process_tcp(): * process_tcp():
* *
* Called to processes a tcp packet from either process_ip4() or process_ip6(). * Called to processes a tcp packet from either process_ip4() or process_ip6().
* The caller breaks out the ip addresses and finds the start of the tcp header. * The caller breaks out the ip addresses and finds the start of the tcp header.
* *
* Skips but otherwise ignores TCP options. * Skips but otherwise ignores TCP options.
* *
* creates a new tcp connection if necessary, then asks the connection to either * creates a new tcp connection if necessary, then asks the connection to either
* print the packet or store it. * print the packet or store it.
* *
skipping to change at line 459 skipping to change at line 554
*/ */
/* q: what if syn is set AND there is data? */ /* q: what if syn is set AND there is data? */
/* q: what if syn is set AND we already know about this connection? */ /* q: what if syn is set AND we already know about this connection? */
if (tcp==NULL){ if (tcp==NULL){
/* Don't process if this is not a SYN and there is no data. */ /* Don't process if this is not a SYN and there is no data. */
if(syn_set==false && tcp_datalen==0) return 0; if(syn_set==false && tcp_datalen==0) return 0;
/* Check if this is the server->client flow related to a client->server f
low that is being demultiplexed */
flow_addr reverse_flow(dst,src,ntohs(tcp_header->th_dport),ntohs(tcp_head
er->th_sport),family);
tcpip *reverse_tcp = find_tcpip(reverse_flow);
uint64_t uid;
if (reverse_tcp)
{
/* We found a matching client->server flow. Copy its session ID */
uid = reverse_tcp->myflow.session_id;
}
else
{
/* Assign a new unique ID */
uid = unique_id++;
}
/* Create a new connection. /* Create a new connection.
* delta will be 0, because it's a new connection! * delta will be 0, because it's a new connection!
*/ */
be13::tcp_seq isn = syn_set ? seq : seq-1; be13::tcp_seq isn = syn_set ? seq : seq-1;
tcp = create_tcpip(this_flow, isn, pi); tcp = create_tcpip(this_flow, isn, pi);
tcp->myflow.session_id = uid;
} }
/* Now tcp is valid */ /* Now tcp is valid */
tcp->myflow.tlast = pi.ts; // most recently seen packet tcp->myflow.tlast = pi.ts; // most recently seen packet
tcp->last_packet_number = packet_counter++; tcp->last_packet_number = packet_counter++;
tcp->myflow.len += pi.pcap_hdr->len; tcp->myflow.len += pi.pcap_hdr->len;
tcp->myflow.caplen += pi.pcap_hdr->caplen; tcp->myflow.caplen += pi.pcap_hdr->caplen;
tcp->myflow.packet_count++; tcp->myflow.packet_count++;
// Does not seem consitent => Print a notice // Does not seem consitent => Print a notice
skipping to change at line 523 skipping to change at line 634
/* process any data. /* process any data.
* Notice that this typically won't be called for the SYN or SYN/ACK, * Notice that this typically won't be called for the SYN or SYN/ACK,
* since they both have no data by definition. * since they both have no data by definition.
*/ */
if (tcp_datalen>0){ if (tcp_datalen>0){
if (opt.console_output) { if (opt.console_output) {
tcp->print_packet(tcp_data, tcp_datalen); tcp->print_packet(tcp_data, tcp_datalen);
} else { } else {
if (opt.store_output){ if (opt.store_output){
bool new_file = false;
if (tcp->fd < 0) new_file = true;
tcp->store_packet(tcp_data, tcp_datalen, delta,pi.ts); tcp->store_packet(tcp_data, tcp_datalen, delta,pi.ts);
if(new_file && tcp_alert_fd>=0){
std::stringstream ss;
ss << "open\t" << tcp->flow_pathname.c_str() << "\n";
const std::string &sso = ss.str();
if(write(tcp_alert_fd,sso.c_str(),sso.size())!=(int)sso.size(
)){
perror("write");
}
}
} }
} }
} }
if (rst_set){ if (rst_set){
remove_flow(this_flow); // take it out of the map remove_flow(this_flow); // take it out of the map
return 0; return 0;
} }
/* Count the FINs. /* Count the FINs.
skipping to change at line 578 skipping to change at line 701
/* make sure that the packet is at least as long as the min IP header */ /* make sure that the packet is at least as long as the min IP header */
if (pi.ip_datalen < sizeof(struct be13::ip4)) { if (pi.ip_datalen < sizeof(struct be13::ip4)) {
DEBUG(6) ("received truncated IP datagram!"); DEBUG(6) ("received truncated IP datagram!");
return -1; // couldn't process return -1; // couldn't process
} }
const struct be13::ip4 *ip_header = (struct be13::ip4 *) pi.ip_data; const struct be13::ip4 *ip_header = (struct be13::ip4 *) pi.ip_data;
DEBUG(100)("process_ip4. caplen=%d vlan=%d ip_p=%d",(int)pi.pcap_hdr->caple n,(int)pi.vlan(),(int)ip_header->ip_p); DEBUG(100)("process_ip4. caplen=%d vlan=%d ip_p=%d",(int)pi.pcap_hdr->caple n,(int)pi.vlan(),(int)ip_header->ip_p);
if(debug>200){ if(debug>200){
sbuf_t sbuf(pos0_t(),(const uint8_t *)pi.ip_data,pi.ip_datalen,pi.ip_data len,false); sbuf_t sbuf(pos0_t(),(const uint8_t *)pi.ip_data,pi.ip_datalen,pi.ip_data len,0,false, false,false);
sbuf.hex_dump(std::cerr); sbuf.hex_dump(std::cerr);
} }
/* for now we're only looking for TCP; throw away everything else */ /* for now we're only looking for TCP; throw away everything else */
if (ip_header->ip_p != IPPROTO_TCP) { if (ip_header->ip_p != IPPROTO_TCP) {
DEBUG(50) ("got non-TCP frame -- IP proto %d", ip_header->ip_p); DEBUG(50) ("got non-TCP frame -- IP proto %d", ip_header->ip_p);
return -1; // couldn't process return -1; // couldn't process
} }
/* check and see if we got everything. NOTE: we must use /* check and see if we got everything. NOTE: we must use
skipping to change at line 616 skipping to change at line 739
/* figure out where the IP header ends */ /* figure out where the IP header ends */
size_t ip_header_len = ip_header->ip_hl * 4; size_t ip_header_len = ip_header->ip_hl * 4;
/* make sure there's some data */ /* make sure there's some data */
if (ip_header_len > ip_len) { if (ip_header_len > ip_len) {
DEBUG(6) ("received truncated IP datagram!"); DEBUG(6) ("received truncated IP datagram!");
return -1; return -1;
} }
/* do TCP processing, faking an ipv6 address */ /* do TCP processing, faking an ipv6 address */
uint16_t ip_payload_len = ip_len - ip_header_len; uint16_t ip_payload_len = pi.ip_datalen - ip_header_len;
ipaddr src(ip_header->ip_src.addr); ipaddr src(ip_header->ip_src.addr);
ipaddr dst(ip_header->ip_dst.addr); ipaddr dst(ip_header->ip_dst.addr);
return process_tcp(src, dst, AF_INET, return (this->*tcp_processor)(src, dst ,AF_INET,
pi.ip_data + ip_header_len, ip_payload_len, pi.ip_data + ip_header_len,
pi); ip_payload_len,pi);
} }
#pragma GCC diagnostic warning "-Wcast-align" #pragma GCC diagnostic warning "-Wcast-align"
/* This is called when we receive an IPv6 datagram. /* This is called when we receive an IPv6 datagram.
* *
* Note: we don't support IPv6 extended headers * Note: we don't support IPv6 extended headers
*/ */
/* These might be defined from an include file, so undef them to be sure */ /* These might be defined from an include file, so undef them to be sure */
skipping to change at line 653 skipping to change at line 776
if (ip_header->ip6_ctlun.ip6_un1.ip6_un1_nxt != IPPROTO_TCP) { if (ip_header->ip6_ctlun.ip6_un1.ip6_un1_nxt != IPPROTO_TCP) {
DEBUG(50) ("got non-TCP frame -- IP proto %d", ip_header->ip6_ctlun.ip6_u n1.ip6_un1_nxt); DEBUG(50) ("got non-TCP frame -- IP proto %d", ip_header->ip6_ctlun.ip6_u n1.ip6_un1_nxt);
return -1; return -1;
} }
/* do TCP processing */ /* do TCP processing */
uint16_t ip_payload_len = ntohs(ip_header->ip6_ctlun.ip6_un1.ip6_un1_plen); uint16_t ip_payload_len = ntohs(ip_header->ip6_ctlun.ip6_un1.ip6_un1_plen);
ipaddr src(ip_header->ip6_src.addr.addr8); ipaddr src(ip_header->ip6_src.addr.addr8);
ipaddr dst(ip_header->ip6_dst.addr.addr8); ipaddr dst(ip_header->ip6_dst.addr.addr8);
return process_tcp(src, dst ,AF_INET6, return (this->*tcp_processor)(src, dst ,AF_INET6,
pi.ip_data + sizeof(struct be13::ip6_hdr),ip_payload_len, pi.ip_data + sizeof(struct be13::ip6_hdr),
pi); ip_payload_len,pi);
} }
/* This is called when we receive an IPv4 or IPv6 datagram. /* This is called when we receive an IPv4 or IPv6 datagram.
* This function calls process_ip4 or process_ip6 * This function calls process_ip4 or process_ip6
* Returns 0 if packet is processed, 1 if it is not processed, -1 if error. * Returns 0 if packet is processed, 1 if it is not processed, -1 if error.
*/ */
#pragma GCC diagnostic ignored "-Wcast-align" #pragma GCC diagnostic ignored "-Wcast-align"
int tcpdemux::process_pkt(const be13::packet_info &pi) int tcpdemux::process_pkt(const be13::packet_info &pi)
{ {
 End of changes. 17 change blocks. 
9 lines changed or deleted 139 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)