Integrate Spark Streaming , kafka and logstash to read and analyze logs on realtime
Below are the simple steps to integrate stark with kafka and logstash:
Installation of Logstash:
1. First few steps are for installing and configuring logstash
sudo vi /etc/yum.repos.d/logstash.repo
2. Add below lines in the text file :
[logstash-2.3]
name=Logstash repository for 2.3.x packages
baseurl=https://packages.elastic.co/logstash/2.3/centos
gpgcheck=1
gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1
3. yum install logstash
4. cd /opt/logstash
5. Create a config file logstash-kafka.conf and add the below content:
input {
file {
path => "/opt/gen_logs/logs/access.log"
}
}
output {
kafka {
codec => plain {
format => "%{message}"
}
topic_id = 'logstash'
}
}
6. Check the configuration using below command :
bin/logstash -f logstash-kafka.conf --configtest
7. Start the logstash
bin/logstash -f logstash-kafka.conf
8. Start the zookeeper server:
bin/zookeeper-server-start.sh config/zookeeper.properties
9. Start kafka server :
bin/kafka-server-start.sh config/server.properties
10. Download scala and scala IDE
11. Create a scala project in IDE
12. Create a scala object by name KafkaSparkStreaming
13. Write below code in your scala file:
package com.sd
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
import scala.collection.immutable.Map
object KafkaSparkStreaming {
def main(args: Array[String]) : Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaSparkStreaming")
val ssc = StreamingContext(conf, Seconds(10))
val topic = Set("logstash")
val kafkaparams = Map[String,String]("metadata.broker.list" -> "127.0.0.1:9092")
val directMessage = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topic)
directMessage.count().print()
ssc.start()
ssc.awaitTermination()
}
}
14. Run the program and you will be able to see log line count in the console for every 10 seconds.
Installation of Logstash:
1. First few steps are for installing and configuring logstash
sudo vi /etc/yum.repos.d/logstash.repo
2. Add below lines in the text file :
[logstash-2.3]
name=Logstash repository for 2.3.x packages
baseurl=https://packages.elastic.co/logstash/2.3/centos
gpgcheck=1
gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1
3. yum install logstash
4. cd /opt/logstash
5. Create a config file logstash-kafka.conf and add the below content:
input {
file {
path => "/opt/gen_logs/logs/access.log"
}
}
output {
kafka {
codec => plain {
format => "%{message}"
}
topic_id = 'logstash'
}
}
6. Check the configuration using below command :
bin/logstash -f logstash-kafka.conf --configtest
7. Start the logstash
bin/logstash -f logstash-kafka.conf
8. Start the zookeeper server:
bin/zookeeper-server-start.sh config/zookeeper.properties
9. Start kafka server :
bin/kafka-server-start.sh config/server.properties
10. Download scala and scala IDE
11. Create a scala project in IDE
12. Create a scala object by name KafkaSparkStreaming
13. Write below code in your scala file:
package com.sd
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
import scala.collection.immutable.Map
object KafkaSparkStreaming {
def main(args: Array[String]) : Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaSparkStreaming")
val ssc = StreamingContext(conf, Seconds(10))
val topic = Set("logstash")
val kafkaparams = Map[String,String]("metadata.broker.list" -> "127.0.0.1:9092")
val directMessage = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topic)
directMessage.count().print()
ssc.start()
ssc.awaitTermination()
}
}
14. Run the program and you will be able to see log line count in the console for every 10 seconds.
Comments
Post a Comment