Class DefaultStreamsExecutionEnvironment
- java.lang.Object
-
- io.streamthoughts.azkarra.runtime.env.DefaultStreamsExecutionEnvironment
-
- All Implemented Interfaces:
AzkarraContextAware
,StreamsExecutionEnvironment
public class DefaultStreamsExecutionEnvironment extends Object implements StreamsExecutionEnvironment, AzkarraContextAware
The defaultStreamsExecutionEnvironment
implementation.
-
-
Constructor Summary
Constructors Constructor Description DefaultStreamsExecutionEnvironment(Conf configuration)
Creates a newDefaultStreamsExecutionEnvironment
instance.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description StreamsExecutionEnvironment
addFallbackConfiguration(Conf fallback)
Adds streamsConfig that will be used in fallback if not present in defined environment streamsConfig.StreamsExecutionEnvironment
addGlobalStateListener(org.apache.kafka.streams.processor.StateRestoreListener listener)
Adds aStateRestoreListener
instance that will set to allKafkaStreams
instance created in thisStreamsExecutionEnvironment
.StreamsExecutionEnvironment
addStateListener(org.apache.kafka.streams.KafkaStreams.StateListener listener)
Adds aKafkaStreams.StateListener
instance that will set to allKafkaStreams
instance created in thisStreamsExecutionEnvironment
.StreamsExecutionEnvironment
addStreamsLifecycleInterceptor(Supplier<StreamsLifecycleInterceptor> interceptor)
Adds a streams interceptor hat will set to allKafkaStreams
instance created in thisStreamsExecutionEnvironment
.ApplicationId
addTopology(Supplier<TopologyProvider> provider)
Add a newTopologyProvider
instance to thisStreamsExecutionEnvironment
to be started.ApplicationId
addTopology(Supplier<TopologyProvider> provider, Executed executed)
Add a newTopologyProvider
instance to thisStreamsExecutionEnvironment
to be started.Collection<KafkaStreamsContainer>
applications()
Returns allKafkaStreams
started applications.static StreamsExecutionEnvironment
create()
Static helper that can be used to creates a newStreamsExecutionEnvironment
instance using the empty configuration and a generated unique name.static StreamsExecutionEnvironment
create(Conf settings)
Static helper that can be used to creates a newStreamsExecutionEnvironment
instance from the specifiedConf
and using a generated env name.static StreamsExecutionEnvironment
create(Conf settings, String envName)
Static helper that can be used to creates a newStreamsExecutionEnvironment
instance from the specifiedConf
and env name.static StreamsExecutionEnvironment
create(String envName)
Static helper that can be used to creates a newStreamsExecutionEnvironment
instance from the specified env name and using the configuration.Conf
getConfiguration()
Gets this environment configuration.Supplier<StreamThreadExceptionHandler>
getStreamThreadExceptionHandler()
Gets theStreamThreadExceptionHandler
.String
name()
Gets the name of thisStreamsExecutionEnvironment
.void
remove(ApplicationId id, Duration timeout)
Stops the streams instance for the specified application id and remove the associated topology from this environment.StreamsExecutionEnvironment
setApplicationIdBuilder(Supplier<ApplicationIdBuilder> supplier)
Sets theApplicationIdBuilder
that should be used for building streamsapplication.id
.void
setAzkarraContext(AzkarraContext context)
Set theAzkarraContext
that this object runs in.StreamsExecutionEnvironment
setConfiguration(Conf configuration)
Sets this environment configuration.StreamsExecutionEnvironment
setKafkaStreamsFactory(Supplier<KafkaStreamsFactory> kafkaStreamsFactory)
Sets theKafkaStreamsFactory
that will be used to provide theKafkaStreams
to configure and start.StreamsExecutionEnvironment
setRocksDBConfig(RocksDBConfig rocksDBConfig)
Sets theRocksDBConfig
streamsConfig used by topology persistent stores.StreamsExecutionEnvironment
setStreamThreadExceptionHandler(Supplier<StreamThreadExceptionHandler> handler)
Sets theStreamThreadExceptionHandler
invoked when a StreamThread abruptly terminates due to an uncaught exception.StreamsExecutionEnvironment
setWaitForTopicsToBeCreated(boolean waitForTopicToBeCreated)
Sets if the streams instances should wait for topics source to be created before starting.void
start()
Starts thisStreamsExecutionEnvironment
instance.State
state()
Gets the state f thisStreamsExecutionEnvironment
.void
stop(boolean cleanUp)
Stops thisStreamsExecutionEnvironment
instance and all runningKafkaStreams
instance.void
stop(ApplicationId id, boolean cleanUp, Duration timeout)
Stops the streams instance for the specified application id.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface io.streamthoughts.azkarra.api.StreamsExecutionEnvironment
remove, stop, stop
-
-
-
-
Constructor Detail
-
DefaultStreamsExecutionEnvironment
public DefaultStreamsExecutionEnvironment(Conf configuration)
Creates a newDefaultStreamsExecutionEnvironment
instance.- Parameters:
configuration
- the defaultConf
instance.
-
-
Method Detail
-
create
public static StreamsExecutionEnvironment create()
Static helper that can be used to creates a newStreamsExecutionEnvironment
instance using the empty configuration and a generated unique name.- Returns:
- a new
StreamsExecutionEnvironment
instance.
-
create
public static StreamsExecutionEnvironment create(String envName)
Static helper that can be used to creates a newStreamsExecutionEnvironment
instance from the specified env name and using the configuration.- Parameters:
envName
- the name to be used for identifying this environment.- Returns:
- a new
StreamsExecutionEnvironment
instance.
-
create
public static StreamsExecutionEnvironment create(Conf settings)
Static helper that can be used to creates a newStreamsExecutionEnvironment
instance from the specifiedConf
and using a generated env name.- Parameters:
settings
- theConf
instance.- Returns:
- a new
StreamsExecutionEnvironment
instance.
-
create
public static StreamsExecutionEnvironment create(Conf settings, String envName)
Static helper that can be used to creates a newStreamsExecutionEnvironment
instance from the specifiedConf
and env name.- Parameters:
settings
- theConf
instance.envName
- the name to be used for identifying this environment.- Returns:
- a new
StreamsExecutionEnvironment
instance.
-
name
public String name()
Gets the name of thisStreamsExecutionEnvironment
.- Specified by:
name
in interfaceStreamsExecutionEnvironment
- Returns:
- the string name.
-
state
public State state()
Gets the state f thisStreamsExecutionEnvironment
.- Specified by:
state
in interfaceStreamsExecutionEnvironment
- Returns:
- the
State
.
-
addStateListener
public StreamsExecutionEnvironment addStateListener(org.apache.kafka.streams.KafkaStreams.StateListener listener)
Adds aKafkaStreams.StateListener
instance that will set to allKafkaStreams
instance created in thisStreamsExecutionEnvironment
.- Specified by:
addStateListener
in interfaceStreamsExecutionEnvironment
- Parameters:
listener
- theKafkaStreams.StateListener
instance.- Returns:
- this
StreamsExecutionEnvironment
instance.
-
addGlobalStateListener
public StreamsExecutionEnvironment addGlobalStateListener(org.apache.kafka.streams.processor.StateRestoreListener listener)
Adds aStateRestoreListener
instance that will set to allKafkaStreams
instance created in thisStreamsExecutionEnvironment
.- Specified by:
addGlobalStateListener
in interfaceStreamsExecutionEnvironment
- Parameters:
listener
- theStateRestoreListener
instance.- Returns:
- this
StreamsExecutionEnvironment
instance. - See Also:
.
-
addStreamsLifecycleInterceptor
public StreamsExecutionEnvironment addStreamsLifecycleInterceptor(Supplier<StreamsLifecycleInterceptor> interceptor)
Adds a streams interceptor hat will set to allKafkaStreams
instance created in thisStreamsExecutionEnvironment
.- Specified by:
addStreamsLifecycleInterceptor
in interfaceStreamsExecutionEnvironment
- Parameters:
interceptor
- the {@link {@link StreamsLifecycleInterceptor}}.- Returns:
- this
StreamsExecutionEnvironment
instance.
-
setStreamThreadExceptionHandler
public StreamsExecutionEnvironment setStreamThreadExceptionHandler(Supplier<StreamThreadExceptionHandler> handler)
Sets theStreamThreadExceptionHandler
invoked when a StreamThread abruptly terminates due to an uncaught exception.- Specified by:
setStreamThreadExceptionHandler
in interfaceStreamsExecutionEnvironment
- Parameters:
handler
- theStreamThreadExceptionHandler
.- Returns:
- this
StreamsExecutionEnvironment
instance. - See Also:
KafkaStreams.setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler)
-
getStreamThreadExceptionHandler
public Supplier<StreamThreadExceptionHandler> getStreamThreadExceptionHandler()
Gets theStreamThreadExceptionHandler
.- Specified by:
getStreamThreadExceptionHandler
in interfaceStreamsExecutionEnvironment
- Returns:
- the
Supplier
, otherwisenull
if no handler is set.
-
applications
public Collection<KafkaStreamsContainer> applications()
Returns allKafkaStreams
started applications.- Specified by:
applications
in interfaceStreamsExecutionEnvironment
- Returns:
- a collection of
KafkaStreamsContainer
applications.
-
setConfiguration
public StreamsExecutionEnvironment setConfiguration(Conf configuration)
Sets this environment configuration.- Specified by:
setConfiguration
in interfaceStreamsExecutionEnvironment
- Returns:
- this
StreamsExecutionEnvironment
instance.
-
getConfiguration
public Conf getConfiguration()
Gets this environment configuration.- Specified by:
getConfiguration
in interfaceStreamsExecutionEnvironment
- Returns:
- the
Conf
instance.
-
setRocksDBConfig
public StreamsExecutionEnvironment setRocksDBConfig(RocksDBConfig rocksDBConfig)
Sets theRocksDBConfig
streamsConfig used by topology persistent stores.- Specified by:
setRocksDBConfig
in interfaceStreamsExecutionEnvironment
- Parameters:
rocksDBConfig
- theRocksDBConfig
instance.- Returns:
- this
StreamsExecutionEnvironment
instance.
-
setApplicationIdBuilder
public StreamsExecutionEnvironment setApplicationIdBuilder(Supplier<ApplicationIdBuilder> supplier)
Sets theApplicationIdBuilder
that should be used for building streamsapplication.id
.- Specified by:
setApplicationIdBuilder
in interfaceStreamsExecutionEnvironment
- Parameters:
supplier
- theApplicationIdBuilder
instance supplier.- Returns:
- this
StreamsExecutionEnvironment
instance.
-
setWaitForTopicsToBeCreated
public StreamsExecutionEnvironment setWaitForTopicsToBeCreated(boolean waitForTopicToBeCreated)
Sets if the streams instances should wait for topics source to be created before starting. If some source topics are missing at startup, a streams instance fails.- Specified by:
setWaitForTopicsToBeCreated
in interfaceStreamsExecutionEnvironment
- Parameters:
waitForTopicToBeCreated
- should wait for topics to be created.- Returns:
- this
StreamsExecutionEnvironment
instance.
-
addTopology
public ApplicationId addTopology(Supplier<TopologyProvider> provider)
Add a newTopologyProvider
instance to thisStreamsExecutionEnvironment
to be started.- Specified by:
addTopology
in interfaceStreamsExecutionEnvironment
- Parameters:
provider
- theTopologyProvider
supplier.- Returns:
- this
ApplicationId
instance if the environment is already started, otherwisenull
.
-
addTopology
public ApplicationId addTopology(Supplier<TopologyProvider> provider, Executed executed)
Add a newTopologyProvider
instance to thisStreamsExecutionEnvironment
to be started.- Specified by:
addTopology
in interfaceStreamsExecutionEnvironment
- Parameters:
provider
- theTopologyProvider
supplier.executed
- theExecuted
instance.- Returns:
- this
ApplicationId
instance if the environment is already started, otherwisenull
.
-
start
public void start() throws IllegalStateException, AzkarraException
Starts thisStreamsExecutionEnvironment
instance.- Specified by:
start
in interfaceStreamsExecutionEnvironment
- Throws:
IllegalStateException
AzkarraException
-
stop
public void stop(boolean cleanUp)
Stops thisStreamsExecutionEnvironment
instance and all runningKafkaStreams
instance.- Specified by:
stop
in interfaceStreamsExecutionEnvironment
- Parameters:
cleanUp
- if local states of eachKafkaStreams
instance must be cleanup.- See Also:
.
-
stop
public void stop(ApplicationId id, boolean cleanUp, Duration timeout)
Stops the streams instance for the specified application id.- Specified by:
stop
in interfaceStreamsExecutionEnvironment
- Parameters:
id
- theApplicationId
of the streams instance.cleanUp
- if local states of eachKafkaStreams
instance must be cleanup.timeout
- the duration to wait for the streams to shutdown.- See Also:
.
-
remove
public void remove(ApplicationId id, Duration timeout)
Stops the streams instance for the specified application id and remove the associated topology from this environment.- Specified by:
remove
in interfaceStreamsExecutionEnvironment
- Parameters:
id
- theApplicationId
of the streams instance.timeout
- the duration to wait for the streams to shutdown.- See Also:
.
-
addFallbackConfiguration
public StreamsExecutionEnvironment addFallbackConfiguration(Conf fallback)
Adds streamsConfig that will be used in fallback if not present in defined environment streamsConfig.- Specified by:
addFallbackConfiguration
in interfaceStreamsExecutionEnvironment
- Parameters:
fallback
- theConf
instance.- Returns:
- this
StreamsExecutionEnvironment
instance.
-
setKafkaStreamsFactory
public StreamsExecutionEnvironment setKafkaStreamsFactory(Supplier<KafkaStreamsFactory> kafkaStreamsFactory)
Sets theKafkaStreamsFactory
that will be used to provide theKafkaStreams
to configure and start.- Specified by:
setKafkaStreamsFactory
in interfaceStreamsExecutionEnvironment
- Parameters:
kafkaStreamsFactory
- theKafkaStreamsFactory
instance.- Returns:
- this
StreamsExecutionEnvironment
instance.
-
setAzkarraContext
public void setAzkarraContext(AzkarraContext context)
Set theAzkarraContext
that this object runs in.- Specified by:
setAzkarraContext
in interfaceAzkarraContextAware
- Parameters:
context
- theAzkarraContext
instance.
-
-