"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "src/broker/Manager.cc" between
zeek-4.0.2.tar.gz and zeek-4.0.3.tar.gz

About: Zeek (formerly Bro) is a flexible network analysis framework focusing on network security monitoring. LTS (Long Term Support) release.

Manager.cc  (zeek-4.0.2):Manager.cc  (zeek-4.0.3)
skipping to change at line 60 skipping to change at line 60
openssl_certificate = get_option("Broker::ssl_certificate")->AsSt ring()->CheckString(); openssl_certificate = get_option("Broker::ssl_certificate")->AsSt ring()->CheckString();
openssl_key = get_option("Broker::ssl_keyfile")->AsString()->Chec kString(); openssl_key = get_option("Broker::ssl_keyfile")->AsString()->Chec kString();
openssl_passphrase = get_option("Broker::ssl_passphrase")->AsStri ng()->CheckString(); openssl_passphrase = get_option("Broker::ssl_passphrase")->AsStri ng()->CheckString();
} }
}; };
class BrokerState { class BrokerState {
public: public:
BrokerState(BrokerConfig config, size_t congestion_queue_size) BrokerState(BrokerConfig config, size_t congestion_queue_size)
: endpoint(std::move(config)), : endpoint(std::move(config)),
subscriber(endpoint.make_subscriber({broker::topics::statuses, subscriber(endpoint.make_subscriber({broker::topic::statuses(),
broker::topics::errors}, broker::topic::errors()},
congestion_queue_size)) congestion_queue_size))
{ {
} }
broker::endpoint endpoint; broker::endpoint endpoint;
broker::subscriber subscriber; broker::subscriber subscriber;
}; };
const broker::endpoint_info Manager::NoPeer{{}, {}}; const broker::endpoint_info Manager::NoPeer{{}, {}};
skipping to change at line 220 skipping to change at line 220
get_option("Broker::moderate_interval")->AsCount()); get_option("Broker::moderate_interval")->AsCount());
config.set("caf.work-stealing.relaxed-steal-interval", config.set("caf.work-stealing.relaxed-steal-interval",
get_option("Broker::relaxed_interval")->AsCount()); get_option("Broker::relaxed_interval")->AsCount());
auto cqs = get_option("Broker::congestion_queue_size")->AsCount(); auto cqs = get_option("Broker::congestion_queue_size")->AsCount();
bstate = std::make_shared<BrokerState>(std::move(config), cqs); bstate = std::make_shared<BrokerState>(std::move(config), cqs);
if ( ! iosource_mgr->RegisterFd(bstate->subscriber.fd(), this) ) if ( ! iosource_mgr->RegisterFd(bstate->subscriber.fd(), this) )
reporter->FatalError("Failed to register broker subscriber with i osource_mgr"); reporter->FatalError("Failed to register broker subscriber with i osource_mgr");
bstate->subscriber.add_topic(broker::topics::store_events, true); bstate->subscriber.add_topic(broker::topic::store_events(), true);
InitializeBrokerStoreForwarding(); InitializeBrokerStoreForwarding();
} }
void Manager::InitializeBrokerStoreForwarding() void Manager::InitializeBrokerStoreForwarding()
{ {
const auto& globals = zeek::detail::global_scope()->Vars(); const auto& globals = zeek::detail::global_scope()->Vars();
for ( const auto& global : globals ) for ( const auto& global : globals )
{ {
skipping to change at line 934 skipping to change at line 934
run_state::detail::update_network_time(util::current_time()); run_state::detail::update_network_time(util::current_time());
auto messages = bstate->subscriber.poll(); auto messages = bstate->subscriber.poll();
bool had_input = ! messages.empty(); bool had_input = ! messages.empty();
for ( auto& message : messages ) for ( auto& message : messages )
{ {
auto& topic = broker::get_topic(message); auto& topic = broker::get_topic(message);
if ( broker::topics::statuses.prefix_of(topic) ) if ( broker::is_prefix(topic, broker::topic::statuses_str) )
{ {
if ( auto stat = broker::make_status_view(get_data(messag e)) ) if ( auto stat = broker::make_status_view(get_data(messag e)) )
{ {
ProcessStatus(stat); ProcessStatus(stat);
} }
else else
{ {
auto str = to_string(message); auto str = to_string(message);
reporter->Warning("ignoring malformed Broker stat us event: %s", reporter->Warning("ignoring malformed Broker stat us event: %s",
str.c_str()); str.c_str());
} }
continue; continue;
} }
if ( broker::topics::errors.prefix_of(topic) ) if ( broker::is_prefix(topic, broker::topic::errors_str) )
{ {
if ( auto err = broker::make_error_view(get_data(message) ) ) if ( auto err = broker::make_error_view(get_data(message) ) )
{ {
ProcessError(err); ProcessError(err);
} }
else else
{ {
auto str = to_string(message); auto str = to_string(message);
reporter->Warning("ignoring malformed Broker erro r event: %s", reporter->Warning("ignoring malformed Broker erro r event: %s",
str.c_str()); str.c_str());
} }
continue; continue;
} }
if ( broker::topics::store_events.prefix_of(topic) ) if ( broker::is_prefix(topic, broker::topic::store_events_str) )
{ {
ProcessStoreEvent(broker::move_data(message)); ProcessStoreEvent(broker::move_data(message));
continue; continue;
} }
try try
{ {
// Once we call a broker::move_* function, we force Broke r to // Once we call a broker::move_* function, we force Broke r to
// unshare the content of the message, i.e., copy the con tent to a // unshare the content of the message, i.e., copy the con tent to a
// different memory region if other threads keep referenc es to the // different memory region if other threads keep referenc es to the
 End of changes. 5 change blocks. 
6 lines changed or deleted 6 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)