- Create New Project in IntelliJ with following versions
sbt version = 1.6.2
JDK = 11 (Amazon Corretto) (java 8 to remove warnings)
scala version = 2.12.15
Spark Version = 3.2.1
Confluent Version = 7.0.1
- Add Following script to
build.sbt
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.12.15"
// Spark
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.1"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.2.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.2.1"
// Twitter4j
libraryDependencies += "org.twitter4j" % "twitter4j-core" % "4.0.7"
libraryDependencies += "org.twitter4j" % "twitter4j-stream" % "4.0.7"
libraryDependencies += "org.apache.bahir" %% "spark-streaming-twitter" % "2.4.0"
// Mongo Spark
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "3.0.1"
// Kafka
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.2.1"
// Json
libraryDependencies += "com.typesafe.play" %% "play-json" % "2.9.2"
// HBC Java Client
libraryDependencies += "com.twitter" % "hbc-core" % "2.2.0"
// Akka
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.6.19",
"com.typesafe.akka" %% "akka-slf4j" % "2.6.19",
"com.typesafe.akka" %% "akka-remote" % "2.6.19",
"com.typesafe.akka" %% "akka-agent" % "2.5.32",
"com.typesafe.akka" %% "akka-testkit" % "2.6.19" % "test",
"com.typesafe.akka" %% "akka-actor-typed" % "2.6.19",
"com.typesafe.akka" %% "akka-actor-testkit-typed" % "2.6.19" % Test,
"com.typesafe.akka" %% "akka-stream" % "2.6.19",
"com.typesafe.akka" %% "akka-http" % "10.2.9"
)
lazy val root = (project in file("."))
.settings(
name := "covid-tweet-analysis"
)
- Start kafka and zookeeper with the following command:
Installation:
- Because kafka works only on 1.8 (really?), we need to give
JAVA_HOME
path to 1.8 version + PATH ofCONFLUENT_HOME
to confluent we just installed at location/Users/shantanu/confluent-7.0.1
# List out the java versions and their path
~ /usr/libexec/java_home -V
# > Output
Matching Java Virtual Machines (3):
17.0.1 (arm64) "Oracle Corporation" - "Java SE 17.0.1" /Library/Java/JavaVirtualMachines/jdk-17.0.1.jdk/Contents/Home
16.0.2 (x86_64) "Amazon.com Inc." - "Amazon Corretto 16" /Users/shashank/Library/Java/JavaVirtualMachines/corretto-16.0.2/Contents/Home
1.8.0_322 (x86_64) "Amazon" - "Amazon Corretto 8" /Users/shashank/Library/Java/JavaVirtualMachines/corretto-1.8.0_322/Contents/Home
~ vi .zshrc
# NOW ADD BELOW 3 export into .zshrc
# Export Amazon corretto 8 which is 1.8 version of java
~ export JAVA_HOME=/Users/shashank/Library/Java/JavaVirtualMachines/corretto-1.8.0_322/Contents/Home
# For confluent to work (find stable solution)
# Set HOME For confluent
~ export CONFLUENT_HOME=/Users/shashank/confluent-7.0.1
# Add to path (temporary ig)
~ export PATH=$PATH:$CONFLUENT_HOME/bin
Setup:
# Run confluent services
~ confluent local services start
Now go to localhost:9021 and create a topic test-topic
with default settings
~ kafka-console-producer --broker-list localhost:9092 --topic covid-tweet
>{"name": "Shantanu", "age": 22, "gender": "Male"}
>{"name": "Bhavesh", "age": 21, "gender": "Male"}
Open another terminal for consumer
~ kafka-console-consumer --bootstrap-server localhost:9092 --topic covid-tweet --from-beginning
>{"name": "Shantanu", "age": 22, "gender": "Male"}
>{"name": "Bhavesh", "age":21, "gender": "Male"}
# Stop service
~ confluent local services stop
# Delete the metadata
~ confluent local destroy
# Prerequisite for kafka
~ brew install java
# Install kafka (upto 5 min)
~ brew install kafka
# List services
~ brew services list
# May require to uncomment last 2 line as showed below
~ vi /opt/homebrew/etc/kafka/server.properties
- Change the following in the file
/opt/homebrew/etc/kafka/server.properties
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners =PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://localhost:9092
# Start the services
~ brew services start zookeeper
~ brew services start kafka
# Create kafka topic
kafka-topics --create --topic covid-tweet --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4
# Create producer console
~ kafka-console-producer --broker-list localhost:9092 --topic covid-tweet
> send first message
> send second message
> send third message
# Create consumer console in another terminal
~ kafka-console-consumer --bootstrap-server localhost:9092 --topic covid-tweet --from-beginning
send first message
send second message
send third message
TwitterToKafka.scala
topic name | value |
---|---|
"key": "test-topic" | "value": { "$binary" : "eyJjcmVhdGVkX2F" } |
"key": "test-topic" | "value": { "$binary" : "JadJjdafadDSFSF" } |
KafkaToMongo.scala
- Example Document in MongoDB
KEY | VALUE |
---|---|
text | RT @RepThomasMassie: You’re at least 2 |
created_at | Wed Apr 06 15:55:03 +0000 2022 |
user_id | 980238526305460224 |
Server.scala
Go to localhost:8086/api/all
Result
[
{
"_id": {
"oid": "624dd4a49898fd74d9963671"
},
"created_at": "Wed Apr 06 15:30:55 +0000 2022",
"entities_hashtags": "[]",
"id": "1511728162857619458",
"lang": "en",
"text": "@oscarhumb @LangmanVince Yeah Biden is a piece of sh*t liar and a failure!\n\nWhat kind of stupidty does it take to b… https://t.co/L3p8ncFeFa",
"truncated": "true",
"user_location": "fabulous Las Vegas, NV",
"user_name": "A Devoted Yogi",
"user_screen_name": "ADevotedYogi"
},
{
"_id": {
"oid": "624dd4a49898fd74d9963672"
},
"created_at": "Wed Apr 06 15:30:55 +0000 2022",
"entities_hashtags": "[]",
"id": "1511728163168174093",
"lang": "fr",
"text": "RT @Belzeboule_: @mev479 @marc_2969 @ch_coulon L'abrogation du pass est dans le programme de Zemmour. Et c'est bien une abrogation, pas une…",
"truncated": "false",
"user_location": "Montauban, France",
"user_name": "GUERMACHE BHARIA",
"user_screen_name": "GuermacheBharia"
}
]