This series goes through conversion of some basic java kafka clients to scala - step by step. It is important to understand that it is written from my viewpoint - someone who has played with scala, likes it, but has never really had time to get into it.
In the previous step we created a basic producer and consumer in scala but it was very close to a line by line conversion. Let's try for something that is closer to normal scala - and let's get the config values out to a configuration file.
First change - instead of having an object with a main method - let's actually state that it's an app.
object X {
def main(args: Array[String]): Unit = {
// Application code
}
}
changes to
object X extends App {
// Application code
}
The config values are present in the code - let's do something about that.
For this we'll use the PureConfig library.
We need to add the following to build.sbt:
"com.github.pureconfig" %% "pureconfig" % "0.12.0"
This is added to the list of libraryDependencies that is already present.
We can add our configuration values to a file under src/main/resources called application.conf.
client-id = "pureconfig-producer"
bootstrap-servers = "localhost:29092"
topic = "pureconfig-topic"
serializer = "org.apache.kafka.common.serialization.StringSerializer"
group-id = "pureconfig-consumer"
bootstrap-servers = "localhost:29092"
topic = "pureconfig-topic"
deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
enable-auto-commit = "true"
auto-commit-interval-ms = "1000"
auto-offset-reset = "earliest"
PureConfig can load the values into a matching case class.
case class Config(clientId: String,
bootstrapServers: String,
topic: String,
serializer: String)
case class Config(groupId: String,
bootstrapServers: String,
enableAutoCommit: String,
autoCommitIntervalMs: String,
autoOffsetReset: String,
deserializer: String,
topic: String
)
We can choose how to handle a config load so that we know if the configuation was loaded OK or not.
One method we can call is loadOrThrow which will throw an exception if it can't load the configuration.
val conf = ConfigSource.default.loadOrThrow[Config]
Another option is to just use load - this returns in effect an Either - where the left choice is ConfigReaderFailures and the right choice is configuration matching the case class asked for.
ConfigSource.default.load[Config] match {
case Left(errors) => ...
case Right(config: Config) => ...
}
There is one thing that means that we can't just use our nice neat Config case classes as is. The kafka clients (KafkaProducer/KafkaConsumer) require a java properties object.
For now - I've simply created an asProperties method onto each Config case class. However - there are other ways of handling this conversion (usually the word implicit turns up here - but for this example - let's keep it simple).
We'll throw in some other small tidying up - this gives the following clients:
import java.time.Duration
import java.util.Properties
import org.apache.kafka.clients.producer.ProducerConfig.{BOOTSTRAP_SERVERS_CONFIG, CLIENT_ID_CONFIG, KEY_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_CLASS_CONFIG}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import pureconfig.ConfigSource
import pureconfig.generic.auto._
case class Config(clientId: String,
bootstrapServers: String,
topic: String,
serializer: String) {
def asProperties: Properties = {
val props = new Properties()
props.put(CLIENT_ID_CONFIG, clientId)
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
props.put(KEY_SERIALIZER_CLASS_CONFIG, serializer)
props.put(VALUE_SERIALIZER_CLASS_CONFIG, serializer)
props
}
}
object ConfigProducer extends App {
ConfigSource.default.load[Config] match {
case Left(errors) =>
println(errors)
System.exit(1)
case Right(config: Config) =>
println("*** Starting Config Producer ***")
val producer = new KafkaProducer[String, String](config.asProperties)
(1 to 5).foreach { i =>
producer.send(new ProducerRecord[String, String](config.topic, s"key-$i", s"value-$i"))
}
producer.close(Duration.ofMillis(100))
println("### Stopping Config Producer ###")
}
}
import java.time.Duration
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerConfig._
import org.apache.kafka.clients.consumer.KafkaConsumer
import pureconfig.ConfigSource
import pureconfig.generic.auto._
import scala.collection.JavaConverters._
case class Config(groupId: String,
bootstrapServers: String,
enableAutoCommit: String,
autoCommitIntervalMs: String,
autoOffsetReset: String,
deserializer: String,
topic: String
) {
def asProperties: Properties = {
val props = new Properties()
props.put(GROUP_ID_CONFIG, groupId)
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
props.put(ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit)
props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs)
props.put(AUTO_OFFSET_RESET_CONFIG, autoOffsetReset)
props.put(KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
props
}
}
object ConfigConsumer extends App {
ConfigSource.default.load[Config] match {
case Left(errors) =>
println(errors)
System.exit(1)
case Right(config: Config) =>
println("*** Starting Config Consumer ***")
val consumer = new KafkaConsumer[String, String](config.asProperties)
try {
consumer.subscribe(List(config.topic).asJava)
while (true) {
val records = consumer.poll(Duration.ofMillis(100)).asScala
for (record <- records) {
println(s"offset = ${record.offset}, key = ${record.key}, value = ${record.value}")
}
}
} finally {
consumer.close()
}
}
}
For each client - we can check it compiles and run it using sbt as before:
sbt compile
sbt's run command will also find classes that extend App so we can also still run:
sbt run
Producer output:
*** Starting Config Producer ***
### Stopping Config Producer ###
Consumer output:
*** Starting Config Consumer ***
offset = 0, key = key-1, value = value-1
offset = 1, key = key-2, value = value-2
offset = 2, key = key-3, value = value-3
offset = 3, key = key-4, value = value-4
offset = 4, key = key-5, value = value-5
In this step we tidied up the producer and consumer a little and moved our configuration out to a config file.