Data Pipeline from Kafka to ElasticSearch using Logstash

Kafka is a popular open-source platform for handling real-time data feeds. It is used for building real-time data pipelines and streaming applications. Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents.

Logstash is an open-source data collection engine with real-time pipelining capabilities. It is a part of the Elastic Stack and is used to transfer data from different sources to Elasticsearch.

This post demonstrates how to configure Logstash to load data from Kafka topics, modify the data and publish it to ElasticSearch. I have created a git repository for this purpose at https://github.com/sergouniotis/logstash-kafka-es.git . To clone the repository run the following command :

git clone https://github.com/sergouniotis/logstash-kafka-es.git

The repository contains a docker-compose.yml. Running the `docker-compose command will start the containers specified in the file. The main containers are :

  • kafka : the kafka broker with a topic named public.events
  • logstash : the logstash server with Bind Mount to local ./pipeline/etl.conf
  • elasticsearch : the elasticsearch server

The main configuration is provided by the file named etl.conf located in the pipeline folder. The pipeline configuration in this file is depicted below:

input {
  kafka{
    codec => json
    bootstrap_servers => "kafka:9092"
    topics => ["public.events"]
    client_id => "event-consumer"
  }
}

filter {
 mutate { 
    add_field =>  { "id" => "%{event_id}" }
  }
  mutate {
    remove_field => ["@version"]
  }
}

output {
    stdout {  
      codec => json_lines  
    } 
    elasticsearch {
        hosts => "elasticsearch"
        index => "events"
        document_id => "%{id}"
        action => "update"
        doc_as_upsert => true
    }

}

The kafka input plugin parameters are explained in more detail at this link: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

The provided hostnames are accessible because all of the Docker containers belong to the same network, as described in the docker-compose.yml file..

The ElasticSearch output plugin will execute an upsert, which means that it will either insert the document if it does not exist or update if it does.

A sample file named “event.json” is provided in the repository and the following command will send the event to the kafka topic named “public.events”

(echo -n "1:" && cat event.json | jq -cM .) | docker exec -i events_kafka_1 kafka-console-producer.sh --bootstrap-server localhost:9092 --topic public.events --property "parse.key=true" --property "key.separator=:"

Finally verify that events stored correctly into ES index named “events”. This can be done by executing the following command :

curl -X GET "http://localhost:9200/events/_search"

and the response is

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 1,
    "hits": [
      {
        "_index": "events",
        "_type": "doc",
        "_id": "12345",
        "_score": 1,
        "_source": {
          "@timestamp": "2022-12-29T09:46:37.629Z",
          "order_items": [
            {
              "quantity": 2,
              "price": 9.99,
              "product_id": 1
            },
            {
              "quantity": 1,
              "price": 14.99,
              "product_id": 2
            }
          ],
          "id": "12345",
          "timestamp": "2022-12-29T12:34:56Z",
          "event_type": "order_placed",
          "event_id": 12345,
          "customer_id": 67890,
          "total_cost": 29.97
        }
      }
    ]
  }
}

In this tutorial, we learned how to create a data pipeline from Kafka to Elasticsearch using Logstash. We configured Kafka to create a topic and send data to it, and used Logstash to consume the events and index them into an Elasticsearch index.

Leave a Reply

Your email address will not be published. Required fields are marked *