StreamsExecutionEnvironment

The execution container that handles all the code logic to run and manage KafkaStreams instances.

1 Creating a new StreamsExecutionEnvironment

A StreamExecutionEnvironment is the interface for creating, configuring and starting new KafkaStreams instances. Basically, a StreamExecutionEnvironment is an execution container that handles all the code logic to run and manage your Topology objects.

Azkarra provides the default implementation DefaultStreamsExecutionEnvironment that you can instantiate as follows :

StreamsExecutionEnvironment env = DefaultStreamsExecutionEnvironment.create()

2 Configuring a StreamsExecutionEnvironment

Each StreamsExecutionEnvironment can be configured with any property of your choice. The configuration can be passed directly while creating a new instance:

Map<String, Object> props = new HashMap<>(); 
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); (1)

Conf streamsConfig = Conf.of("streams", props) (2)

StreamsExecutionEnvironment env = DefaultStreamsExecutionEnvironment.create(streamsConfig); (3)

or using the setConfiguration method:

env.setConfiguration(Conf.of("streams", props)) (4)
  1. Create the Map to be used for configuring the StreamsExecutionEnvironment.
  2. The streams properties must be prefixed with streams..
  3. The configuration is passed to create method.
  4. Optionally, the configuration is setted using the setConfiguration method.

3 Registering TopologyProvider

You can register multiple TopologyProvider to an StreamsExecutionEnvironment using the addTopology method.

env.addTopology(
    WordCountTopology::new,   (1)
    Executed.as("wordcount")  (2)
    );
  1. The TopologyProvider accepts a Supplier<TopologyProvider> object.
  2. The Executed class can be used to name and describe a Topology. For example, the name can be used to auto-generate an application.id if no one is configured.

In addition, the Executed class allows overriding environment properties :

env.addTopology(
    WordCountTopology::new,
    Executed.as("wordcount")
            .withDescription("Basic WordCount Topology")
            .withConfig(Conf.with("streams.application.id", "my-word-count-application"))
    );

4 Using multiple environments

A StreamsExecutionEnvironment allows setting common configuration properties, listeners and behaviors to a set of KafkaStreams instances.

For most usages, you will usually only create a single StreamsExecutionEnvironment instance. But, there is no restriction on the number of instances you can create. Having multiple StreamsExecutionEnvironment can be useful.

For example, you may wish to execute a KafkaStreams application on two distinct Apache Kafka clusters. An StreamsExecutionEnvironment is uniquely identified by its name.

StreamsExecutionEnvironment env1 = DefaultStreamsExecutionEnvironment.create(
    Conf.with("streams", confEnv1), "prod-eu-west-1");

StreamsExecutionEnvironment env2 = DefaultStreamsExecutionEnvironment.create(
    Conf.with("streams", confEnv2), "prod-eu-west-2");

5 Automatically generating the application.id property

The ApplicationIdBuilder is the interface that can be used to automatically generate the application.id property to be set to a KafkaStreams instance.

Azkarra provides the DefaultApplicationIdBuilder implementation that will generates an identifier only if no one is already configured. The generated application is in the form: <env_name>-<topology_name>-<topology-version>.

It’s also possible to provide a custom ApplicationIdBuilder using the StreamsExecutionEnvironment#setApplicationIdBuilder method.

env.setApplicationIdBuilder(() -> new ApplicationIdBuilder() {
    @Override
    public ApplicationId buildApplicationId(final TopologyMetadata metadata, final Conf streamsConfig) {
        return new ApplicationId(metadata.name() + metadata.version());
    }
});

6 Customizing KafkaStreams instance

By default, Azkarra is responsible for creating a new KafkaStreams instance for each provided Topology.

But, in some cases, you may want to be able to customize how the KafkaStreams instances are created.

For example, it may be to provide a KafkaClientSupplier that will add some tracing mechanisms on top of the Kafka clients (e.g: kafka-opentracing.

Azkarra provides the KafkaStreamsFactory interface, allowing you to customize how KafkaStreams instances are built.

Below is the interface :

public interface KafkaStreamsFactory {

    KafkaStreams make(final Topology topology, final Conf streamsConfig);
}

To configure a KafkaStreamsFactory for a specific environment you can use the StreamsExecutionEnvironment#setKafkaStreamsFactory method.

StreamsExecutionEnvironment env = DefaultStreamsExecutionEnvironment.create();
env.setKafkaStreamsFactory(() -> new KafkaStreamsFactory() {
    @Override
    public KafkaStreams make(final Topology topology, final Conf streamsConfig) {
        KafkaClientSupplier clientSupplier = //...
        return new KafkaStreams(topology, streamsConfig.getConfAsProperties(), clientSupplier);
    }
});

7 The StreamsExecutionEnvironmentAware interface

Azkarra provides the StreamsExecutionEnvironmentAware interface that components can implement to be notified of the StreamsExecutionEnvironment that it runs in.

Below are the list of components that currently support the StreamsExecutionEnvironmentAware interface:

  • TopologyProvider
  • ApplicationIdBuilder
  • KafkaStreamsFactory
  • StreamsLifecycleInterceptor

8 Starting an environment

Most of the time, you will never have to start an environment directly because this will be managed by the AzkarraContext or AzkarraApplication classes.

But you may have to integrate Azkarra with another framework (such as Spring, Micronaut). And, because this is usually done at the StreamsExecutionEnvironment level it can be convenient to manually invoke the method bellow:

env.start();
Last modified March 3, 2021: docs(site): update docs for 0.9.x (ca3e155b)