Integrate Spark Streaming , kafka and logstash to read and analyze logs on realtime

Below are the simple steps to integrate stark with kafka and logstash:

Installation of Logstash:

1. First few steps are for installing and configuring logstash
     sudo vi /etc/yum.repos.d/logstash.repo

2. Add below lines in the text file :
    [logstash-2.3]
    name=Logstash repository for 2.3.x packages
    baseurl=https://packages.elastic.co/logstash/2.3/centos
    gpgcheck=1
    gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch
    enabled=1
3. yum install logstash
4. cd /opt/logstash
5. Create a config file logstash-kafka.conf and add the below content:

    input {
           file {
                path => "/opt/gen_logs/logs/access.log"
           }
    }

   output {
             kafka {
                      codec =>  plain {
                              format => "%{message}"
                      }
                      topic_id = 'logstash'
             }
  }

6. Check the configuration using below command :
          bin/logstash -f logstash-kafka.conf --configtest

7. Start the logstash
          bin/logstash -f logstash-kafka.conf

8. Start the zookeeper server:
          bin/zookeeper-server-start.sh config/zookeeper.properties

9. Start kafka server :
         bin/kafka-server-start.sh config/server.properties

10. Download scala and scala IDE

11. Create a scala project in IDE

12. Create a scala object by name KafkaSparkStreaming

13. Write below code in your scala file:

      package com.sd
      import org.apache.spark.SparkConf
      import org.apache.spark.streaming.StreamingContext
      import org.apache.spark.streaming.Seconds
      import org.apache.spark.streaming.kafka.KafkaUtils
      import kafka.serializer.StringDecoder
      import scala.collection.immutable.Map

      object KafkaSparkStreaming {
           def main(args: Array[String]) : Unit = {
               val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaSparkStreaming")
               val ssc = StreamingContext(conf, Seconds(10))
               val topic = Set("logstash")
               val kafkaparams = Map[String,String]("metadata.broker.list" -> "127.0.0.1:9092")
               val directMessage = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topic)
               directMessage.count().print()
               ssc.start()
               ssc.awaitTermination()
               }
       }

14. Run the program and you will be able to see log line count in the console for every 10 seconds.

Comments