Integrate Spark Streaming , kafka and logstash to read and analyze logs on realtime

Below are the simple steps to integrate stark with kafka and logstash:

Installation of Logstash:

1. First few steps are for installing and configuring logstash
     sudo vi /etc/yum.repos.d/logstash.repo

2. Add below lines in the text file :
    [logstash-2.3]
    name=Logstash repository for 2.3.x packages
    baseurl=https://packages.elastic.co/logstash/2.3/centos
    gpgcheck=1
    gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch
    enabled=1
3. yum install logstash
4. cd /opt/logstash
5. Create a config file logstash-kafka.conf and add the below content:

    input {
           file {
                path => "/opt/gen_logs/logs/access.log"
           }
    }

   output {
             kafka {
                      codec =>  plain {
                              format => "%{message}"
                      }
                      topic_id = 'logstash'
             }
  }

6. Check the configuration using below command :
          bin/logstash -f logstash-kafka.conf --configtest

7. Start the logstash
          bin/logstash -f logstash-kafka.conf

8. Start the zookeeper server:
          bin/zookeeper-server-start.sh config/zookeeper.properties

9. Start kafka server :
         bin/kafka-server-start.sh config/server.properties

10. Download scala and scala IDE

11. Create a scala project in IDE

12. Create a scala object by name KafkaSparkStreaming

13. Write below code in your scala file:

      package com.sd
      import org.apache.spark.SparkConf
      import org.apache.spark.streaming.StreamingContext
      import org.apache.spark.streaming.Seconds
      import org.apache.spark.streaming.kafka.KafkaUtils
      import kafka.serializer.StringDecoder
      import scala.collection.immutable.Map

      object KafkaSparkStreaming {
           def main(args: Array[String]) : Unit = {
               val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaSparkStreaming")
               val ssc = StreamingContext(conf, Seconds(10))
               val topic = Set("logstash")
               val kafkaparams = Map[String,String]("metadata.broker.list" -> "127.0.0.1:9092")
               val directMessage = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topic)
               directMessage.count().print()
               ssc.start()
               ssc.awaitTermination()
               }
       }

14. Run the program and you will be able to see log line count in the console for every 10 seconds.

Comments

Popular posts from this blog

How to add multiple triggers for a job in Quartz