How To Connect Zeek To Python

I was recently asked how to send data from Zeek to Python. After flipping through the Zeek Broker documentation I couldn’t find a good example to reference, so here is my example.

The code for this demo is available here:

https://github.com/keithjjones/zeek-python-broker-demo

The first piece of our source code is the Python program here:

https://github.com/keithjjones/zeek-python-broker-demo/blob/master/broker-test.py

There is a prerequisite for this Python script, the Zeek Python Broker bindings must be downloaded and installed. You can find the installation instructions here:

https://docs.zeek.org/projects/broker/en/master/python.html#installation-in-a-virtual-environment

The trick is you must install the version of broker that corresponds to your Zeek’s version. Be sure to read that last sentence again. I figured this out the hard way.

Your version of Broker can be looked up by visiting the Zeek repository and selecting your Zeek version in the branch dropdown, such as v5.0.4. Then, go into the “auxil” directory to find Broker:

https://github.com/zeek/zeek/tree/v5.0.4/auxil

If you click into the Broker directory you will enter its repository at the version used in Zeek v5.0.4. If you click on “VERSION”, you will find version 2.3.5 of Broker is used in Zeek v5.0.4:

https://github.com/zeek/broker/blob/a7d55f8da2c47bf8b3de2524b24e1d8bfec1c2ce/VERSION

You can download Broker v2.3.5 here:

https://github.com/zeek/broker/releases

Once you download Broker and install the Python bindings using the instructions linked above, you can finally execute the Python script. Here is the script’s content:

import sys
import broker

# Setup endpoint and connect to Zeek.
with broker.Endpoint() as ep, \
    ep.make_subscriber("/topic/test") as sub:

    ep.listen("127.0.0.1", 60000)

    while True:
        (t, d) = sub.get()
        pong = broker.zeek.Event(d)
        print("received {}  --   {}".format(pong.name(), pong.args()))

        python_results = broker.zeek.Event("python_results", pong.args()[0]);
        ep.publish("/topic/test", python_results);

The script is relatively simple. The following lines set up a Zeek Broker listener on 127.0.0.1:60000 via Python:

https://github.com/keithjjones/zeek-python-broker-demo/blob/master/broker-test.py#L5-L8

Then, the Python script runs in an infinite loop pulling messages from Broker via the “get” function:

https://github.com/keithjjones/zeek-python-broker-demo/blob/master/broker-test.py#L11

Here “t” is the message’s topic, and “d” is the message’s raw data. Line 12 then translates the data pulled from Broker into a Python event object called “pong”. The name “pong” is not important, I wanted to show that this object could be named anything because the “.name()” function will tell you the true event name.

The last two lines in the script then create an event object and publishes it through Broker so that the “python_results” event will fire back in Zeek. The event will have the same argument that was originally passed to the Python process from Zeek (“pong.args()[0]”). Here is example output from the Python script:

received some_test_event  --   [(IPv4Address('172.20.32.121'), 47808/udp, IPv4Address('172.20.32.255'), 47808/udp)]
received some_test_event  --   [(IPv4Address('172.20.32.109'), 47808/udp, IPv4Address('172.20.32.255'), 47808/udp)]
received some_test_event  --   [(IPv4Address('172.20.32.200'), 47808/udp, IPv4Address('172.20.32.115'), 47808/udp)]
received some_test_event  --   [(IPv4Address('172.20.32.200'), 47808/udp, IPv4Address('172.20.32.112'), 47808/udp)]
received some_test_event  --   [(IPv4Address('172.20.32.200'), 47808/udp, IPv4Address('172.20.32.110'), 47808/udp)]
received some_test_event  --   [(IPv4Address('172.20.32.200'), 47808/udp, IPv4Address('172.20.32.121'), 47808/udp)]
received some_test_event  --   [(IPv4Address('172.20.32.200'), 47808/udp, IPv4Address('172.20.32.109'), 47808/udp)]

In the Zeek script content below you will find the event “python_results” is handled here:

https://github.com/keithjjones/zeek-python-broker-demo/blob/master/broker-test.zeek#L6

It just prints the arguments to the event. In “connection_state_remove” the script sends the “c$id” conn_id record to Python:

https://github.com/keithjjones/zeek-python-broker-demo/blob/master/broker-test.zeek#L11-L14

Therefore, this is the same data that will be printed when it is returned via “python_results”.

global test_topic = "/topic/test";

global some_test_event: event(c_id: conn_id);

event python_results(c_id: conn_id)
{
	print(cat("Got Python Results: ", c_id));
}

event connection_state_remove(c: connection)
{
    Broker::publish(test_topic, some_test_event, c$id);
}

event zeek_init()
{
	Broker::peer(addr_to_uri(127.0.0.1), 60000/tcp);
	Broker::subscribe(test_topic);
}

The two lines in “zeek_init” take care of connecting to the Python process before publishing the results to the “/topic/test” topic in Zeek’s Broker.

When running the Zeek script on a PCAP while the Python script is executing in another window, you will see the connection ID records printed from the Zeek and the Python processes in their respective windows as they are processed. This will prove that the data was transferred from your PCAP through Zeek, into Python over Zeek’s Broker, and back to Zeek again over Broker. This is example output from the Zeek script:

Got Python Results: [orig_h=172.20.32.200, orig_p=47808/udp, resp_h=172.20.32.115, resp_p=47808/udp]
Got Python Results: [orig_h=172.20.32.200, orig_p=47808/udp, resp_h=172.20.32.112, resp_p=47808/udp]
Got Python Results: [orig_h=172.20.32.200, orig_p=47808/udp, resp_h=172.20.32.110, resp_p=47808/udp]
Got Python Results: [orig_h=172.20.32.200, orig_p=47808/udp, resp_h=172.20.32.121, resp_p=47808/udp]
Got Python Results: [orig_h=172.20.32.200, orig_p=47808/udp, resp_h=172.20.32.109, resp_p=47808/udp]

Now imagine all the new types of processing you can do in Python that may have been more difficult with Zeek alone!

12 responses to “How To Connect Zeek To Python”

  1. Subhajit Avatar
    Subhajit

    I have posted few question in your youtube channel. This is regarding some unexpected behavior I am getting with added new_connection(c) and connection_state_remove(c) events in main.zeek. For https://github.com/cisagov/icsnpp-dnp3/blob/main/tests/traces/dnp3_example.pcap

    I am getting 10 new connection and 11 remove connection. One connection from sport=20000 to dport=20000, is not recognizing in new connection event. Why so?
    ——————————————————————————-
    code block inside python script:
    ——————————————————————————–
    if dnp3_event.name() == ‘DNP3_Extended::new_conn_added’:
    print(“got new connection: sip = {} , sport = {}, dip = {}, dport = {}”.format(dnp3_data[0],dnp3_data[1],dnp3_data[2],dnp3_data[3]))
    if dnp3_event.name() == ‘DNP3_Extended::conn_removed’:
    print(“got remove connection: sip = {} , sport = {} , dip = {}, dport = {}”.format(dnp3_data[0],dnp3_data[1],dnp3_data[2],dnp3_data[3]))

    ————————————————————————————–
    Output:-
    ————————————————————————————–
    got new connection: sip = 10.10.20.5 , sport = 55355/tcp, dip = 10.10.20.8, dport = 20000/tcp
    got new connection: sip = 10.10.20.5 , sport = 55356/tcp, dip = 10.10.20.8, dport = 20000/tcp
    got new connection: sip = 10.10.20.5 , sport = 55357/tcp, dip = 10.10.20.8, dport = 20000/tcp
    got new connection: sip = 10.10.20.5 , sport = 55358/tcp, dip = 10.10.20.8, dport = 20000/tcp
    got new connection: sip = 10.10.20.5 , sport = 55359/tcp, dip = 10.10.20.8, dport = 20000/tcp
    got new connection: sip = 10.10.20.5 , sport = 55361/tcp, dip = 10.10.20.8, dport = 20000/tcp
    got new connection: sip = 10.10.20.5 , sport = 55362/tcp, dip = 10.10.20.8, dport = 20000/tcp
    got new connection: sip = 10.10.20.5 , sport = 55363/tcp, dip = 10.10.20.8, dport = 20000/tcp
    got new connection: sip = 10.10.20.5 , sport = 55366/tcp, dip = 10.10.20.8, dport = 20000/tcp
    got new connection: sip = 10.10.20.5 , sport = 55370/tcp, dip = 10.10.20.8, dport = 20000/tcp
    got remove connection: sip = 10.10.20.5 , sport = 20000/tcp , dip = 10.10.20.8, dport = 20000/tcp
    got remove connection: sip = 10.10.20.5 , sport = 55355/tcp , dip = 10.10.20.8, dport = 20000/tcp
    got remove connection: sip = 10.10.20.5 , sport = 55356/tcp , dip = 10.10.20.8, dport = 20000/tcp
    got remove connection: sip = 10.10.20.5 , sport = 55357/tcp , dip = 10.10.20.8, dport = 20000/tcp
    got remove connection: sip = 10.10.20.5 , sport = 55358/tcp , dip = 10.10.20.8, dport = 20000/tcp
    got remove connection: sip = 10.10.20.5 , sport = 55359/tcp , dip = 10.10.20.8, dport = 20000/tcp
    got remove connection: sip = 10.10.20.5 , sport = 55361/tcp , dip = 10.10.20.8, dport = 20000/tcp
    got remove connection: sip = 10.10.20.5 , sport = 55362/tcp , dip = 10.10.20.8, dport = 20000/tcp
    got remove connection: sip = 10.10.20.5 , sport = 55363/tcp , dip = 10.10.20.8, dport = 20000/tcp
    got remove connection: sip = 10.10.20.5 , sport = 55366/tcp , dip = 10.10.20.8, dport = 20000/tcp
    got remove connection: sip = 10.10.20.5 , sport = 55370/tcp , dip = 10.10.20.8, dport = 20000/tcp

    Thanks

    1. drkeithjones Avatar

      Thanks. I responded on YouTube too. Here I’m getting 11 connection state removes and 11 new connections:


      $ cat test.zeek
      event new_connection(c: connection)
      {
      print "NEW", c$id;
      }

      event connection_state_remove(c: connection)
      {
      print "REMOVE", c$id;
      }

      $ zeek -Cr ../dnp3_example.pcap packages ./test.zeek
      NEW, [orig_h=10.10.20.5, orig_p=20000/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      NEW, [orig_h=10.10.20.5, orig_p=55355/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      NEW, [orig_h=10.10.20.5, orig_p=55356/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      NEW, [orig_h=10.10.20.5, orig_p=55357/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      NEW, [orig_h=10.10.20.5, orig_p=55358/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      NEW, [orig_h=10.10.20.5, orig_p=55359/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      NEW, [orig_h=10.10.20.5, orig_p=55361/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      NEW, [orig_h=10.10.20.5, orig_p=55362/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      NEW, [orig_h=10.10.20.5, orig_p=55363/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      NEW, [orig_h=10.10.20.5, orig_p=55366/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      NEW, [orig_h=10.10.20.5, orig_p=55370/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      REMOVE, [orig_h=10.10.20.5, orig_p=55366/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      REMOVE, [orig_h=10.10.20.5, orig_p=55363/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      REMOVE, [orig_h=10.10.20.5, orig_p=55370/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      REMOVE, [orig_h=10.10.20.5, orig_p=55361/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      REMOVE, [orig_h=10.10.20.5, orig_p=55356/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      REMOVE, [orig_h=10.10.20.5, orig_p=55358/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      REMOVE, [orig_h=10.10.20.5, orig_p=55359/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      REMOVE, [orig_h=10.10.20.5, orig_p=55355/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      REMOVE, [orig_h=10.10.20.5, orig_p=55357/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      REMOVE, [orig_h=10.10.20.5, orig_p=55362/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
      REMOVE, [orig_h=10.10.20.5, orig_p=20000/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]

  2. Subhajit Avatar
    Subhajit

    zeek script side code snippet:

    event new_connection(c: connection)
    {
    Broker::publish(dnp3_topic, new_conn_added, c);
    }

    event connection_state_remove(c: connection)
    {
    Broker::publish(dnp3_topic, conn_removed, c);
    }

  3. Subhajit Avatar
    Subhajit

    Yes I am also getting 11 NEW and 11 REMOVE from the zeek script but I think the mismatch is due to the delay in connection establishment with the endpoint. Before the first new_connection is coming (which is coincidentally sport 20000/tcp and dport 20000/tcp ) and the endpoint connection is yet not established, that’s why it is not getting published.

    1. drkeithjones Avatar

      If that’s the case you could suspend processing with: https://docs.zeek.org/en/master/scripts/base/bif/zeek.bif.zeek.html#id-suspend_processing first, then connect to your peer, then continue processing with https://docs.zeek.org/en/master/scripts/base/bif/zeek.bif.zeek.html#id-continue_processing to go through your pcap.

  4. Subhajit Avatar
    Subhajit

    Is there any option to delay or make sure that once peer is established then only publish events or is it implicitly handled? I can see schedule X secs but not able to run it for new_connection using schedule 30 sec { new_connection (c: connection) };
    Anyway I am not sure why python is not receiving first new connection event, as it is receiving all afterwards.

  5. Subhajit Avatar
    Subhajit

    Does zeek always invoke zeek_init() first? I put suspend_processing there and called Broker::peer() and continue_processing, issue is still there. Is peer() a blocking call? How to guarantee that once peer is connected then only processing is starting? I am just missing the first event, that’s why I am suspecting this reason. What else could be the reason for this? Will it be possible for you to try connecting to the python peer and check if you are getting all 11 new connections in your python script? Thanks

    1. drkeithjones Avatar

      No, establishing a peer doesn’t block and a peer can become disconnected. You will have to resume processing again when you see the peer established via this event: https://docs.zeek.org/en/master/scripts/base/bif/comm.bif.zeek.html#id-Broker::peer_added

      1. Subhajit Avatar
        Subhajit

        I have tried to continue_processing() inside peer_added(), but it seems the first new event is getting fired before that and because it is not yet established, peer is not receiving that message.

        NEW, [orig_h=10.10.20.5, orig_p=20000/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        PEER ADDED, [id=212f24f5-6aa3-567b-aa22-23d19479859e, network=[address=127.0.0.1, bound_port=60000/tcp]]
        NEW, [orig_h=10.10.20.5, orig_p=55355/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        NEW, [orig_h=10.10.20.5, orig_p=55356/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        NEW, [orig_h=10.10.20.5, orig_p=55357/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        NEW, [orig_h=10.10.20.5, orig_p=55358/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        NEW, [orig_h=10.10.20.5, orig_p=55359/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        NEW, [orig_h=10.10.20.5, orig_p=55361/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        NEW, [orig_h=10.10.20.5, orig_p=55362/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        NEW, [orig_h=10.10.20.5, orig_p=55363/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        NEW, [orig_h=10.10.20.5, orig_p=55366/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        NEW, [orig_h=10.10.20.5, orig_p=55370/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        REMOVE, [orig_h=10.10.20.5, orig_p=20000/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        REMOVE, [orig_h=10.10.20.5, orig_p=55355/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        REMOVE, [orig_h=10.10.20.5, orig_p=55356/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        REMOVE, [orig_h=10.10.20.5, orig_p=55357/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        REMOVE, [orig_h=10.10.20.5, orig_p=55358/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        REMOVE, [orig_h=10.10.20.5, orig_p=55359/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        REMOVE, [orig_h=10.10.20.5, orig_p=55361/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        REMOVE, [orig_h=10.10.20.5, orig_p=55362/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        REMOVE, [orig_h=10.10.20.5, orig_p=55363/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        REMOVE, [orig_h=10.10.20.5, orig_p=55366/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]
        REMOVE, [orig_h=10.10.20.5, orig_p=55370/tcp, resp_h=10.10.20.8, resp_p=20000/tcp]

        ———————————————-
        event zeek_init() {
        suspend_processing();
        Broker::peer(addr_to_uri(127.0.0.1), 60000/tcp);
        }

        event Broker::peer_added(ep: Broker::EndpointInfo, msg: string)
        {
        print “PEER ADDED”, ep;
        continue_processing();
        }
        event new_connection(c: connection)
        {
        print “NEW”, c$id;
        Broker::publish(my_topic, new_conn_added, c);
        }
        event connection_state_remove(c: connection)
        {
        print “REMOVE”, c$id;
        Broker::publish(my_topic, conn_removed, c);
        }

        Please check if I am doing any mistake.
        Thanks

        1. drkeithjones Avatar

          I also was able to replicate what you experienced. I checked with one of the core developers of Zeek and he said this happens because Zeek has to read at least 1 packet in before it can do things like suspend processing. When we call suspend processing, one packet has already been read so we’re really saying any future connections after the first connection.

Leave a Reply

Your email address will not be published. Required fields are marked *