Flink状态管理与checkpoint容错机制(二) | 字数总计: 2.1k | 阅读时长: 10分钟 | 阅读量: |
状态分类 Flink支持两种状态Keyed State
和Operator State
。两类状态又都包括原始状态row state
和托管状态managed state
。
原始状态:由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。当实现一个用户自定义的operator时,会使用到原始状态
托管状态是由Flink框架管理的状态,通常在DataStream上的状态推荐使用托管的状态。
Keyed State 该类状态是基于KeyedStream上的状态,这个状态是根据特定的key绑定的,对keyedStream流上的每一个key,都对应着一个state。stream.keyBy(...)
数据结构:
ValueState: 即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值
ListState: 即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。
ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。
MapState<UK, UV>:即状态值为一个map。用户通过put或putAll方法添加元素。
通过value()
获取值,通过update()
更新值,Keyed State继承RichFunction类
private static int sourceCount = 0 ;public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend ("file:///D://hadoop//data//checkpoint" )); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(1000 ); DataStream<Tuple3<Integer, String, Integer>> source = env.addSource(new KeyedStateSource ()); source.keyBy(0 ) .window(TumblingProcessingTimeWindows.of(Time.seconds(2 ))) .apply(new KeyedStateRichFunction ()); env.execute(); } private static class KeyedStateSource implements SourceFunction <Tuple3<Integer, String, Integer>> { private Boolean isRunning = true ; private int count = 0 ; @Override public void run (SourceContext<Tuple3<Integer, String, Integer>> sourceContext) throws Exception { while (isRunning){ for (int i = 0 ; i < 10 ; i++) { sourceContext.collect(Tuple3.of(1 ,"ahah" ,count)); count++; } if (count>100 ){ System.out.println("err_________________" ); throw new Exception ("123" ); } sourceCount = count; Thread.sleep(2000 ); } } @Override public void cancel () { } } private static class KeyedStateRichFunction extends RichWindowFunction <Tuple3<Integer,String,Integer>, Integer, Tuple, TimeWindow> { private transient ValueState<Integer> state; private int count = 0 ; @Override public void apply (Tuple tuple, TimeWindow timeWindow, Iterable<Tuple3<Integer, String, Integer>> iterable, Collector<Integer> collector) throws Exception { count=state.value(); for (Tuple3<Integer, String, Integer> item : iterable){ count++; } state.update(count); System.out.println("windows count:" +count+" all count:" + sourceCount); collector.collect(count); } @Override public void open (Configuration parameters) throws Exception { ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor <Integer>( "average" , TypeInformation.of(new TypeHint <Integer>() {}), 0 ); state = getRuntimeContext().getState(descriptor); } }
代码详情:
加载数据源,每次count=10
休眠2s,当达到count=100
时中断数据源,重新开始…
并把window窗口大小设置2s负责触发计算,观察每次throw exception后,能不能从之前的结果开始算…
输出:
all count:10 source count:10 all count:20 source count:20 all count:30 source count:30 all count:40 source count:40 all count:50 source count:50 all count:60 source count:60 all count:70 source count:70 all count:80 source count:80 all count:90 source count:90 all count:100 source count:100 err_________________ all count:110 source count:10 all count:120 source count:20
从结果可以看出达到了想要的效果,all count的值并没有从0开始计算,而是从之前的结果计算。
Operator State 该类State与key无关,整个operator对应一个state,该类State没有Keyed Key支持的数据结构多,仅支持ListState。举例来说,Flink中的Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。
有两种方式:
CheckpointedFunction(很少使用,需要自己初始化) void snapshotState (FunctionSnapshotContext context) throws Exception;void initializeState (FunctionInitializationContext context) throws Exception;
public class BufferingSink implements SinkFunction <Tuple2<String, Integer>>, CheckpointedFunction { private final int threshold; private transient ListState<Tuple2<String, Integer>> checkpointedState; private List<Tuple2<String, Integer>> bufferedElements; public BufferingSink (int threshold) { this .threshold = threshold; this .bufferedElements = new ArrayList <>(); } @Override public void invoke (Tuple2<String, Integer> value) throws Exception { bufferedElements.add(value); if (bufferedElements.size() == threshold) { for (Tuple2<String, Integer> element: bufferedElements) { } bufferedElements.clear(); } } @Override public void snapshotState (FunctionSnapshotContext context) throws Exception { checkpointedState.clear(); for (Tuple2<String, Integer> element : bufferedElements) { checkpointedState.add(element); } } @Override public void initializeState (FunctionInitializationContext context) throws Exception { ListStateDescriptor<Tuple2<String, Integer>> descriptor = new ListStateDescriptor <>( "buffered-elements" , TypeInformation.of(new TypeHint <Tuple2<String, Integer>>() {})); checkpointedState = context.getOperatorStateStore().getListState(descriptor); if (context.isRestored()) { for (Tuple2<String, Integer> element : checkpointedState.get()) { bufferedElements.add(element); } } } }
ListCheckpointed(常用,Flink自动初始化) List<T> snapshotState (long checkpointId, long timestamp) throws Exception; void restoreState (List<T> state) throws Exception;
仅支持list state,ListCheckpointed是CheckpointedFunction的限制版,它仅仅支持Even-splitredistribution模式的list-style state。ListCheckpointed定义了两个方法,分别是snapshotState方法及restoreState方法;snapshotState方法在master触发checkpoint的时候被调用,用户需要返回当前的状态,而restoreState方法会在failure recovery的时候被调用,传递的参数为List类型的state,方法里头可以将state恢复到本地.
private static int sourceCount = 0 ;public static void main (String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend ("file:///D://hadoop//data//checkpoint" )); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(1000 ); DataStream<Tuple4<Integer,String,String,Integer>> source = env.addSource(new OperatorStateSource ()); source.keyBy(0 ) .window(TumblingProcessingTimeWindows.of(Time.seconds(2 ))) .apply(new OperatorStateAppy ()); env.execute("" ); } private static class OperatorStateSource extends RichSourceFunction <Tuple4<Integer,String,String,Integer>> implements ListCheckpointed <UserState>{ private int count = 0 ; private boolean is_running = true ; @Override public List<UserState> snapshotState (long l, long l1) throws Exception { List<UserState> userStateList = new ArrayList <>(); UserState state = new UserState (); state.setCount(count); userStateList.add(state); return userStateList; } @Override public void restoreState (List<UserState> list) throws Exception { count = list.get(0 ).getCount(); System.out.println("OperatorStateSource restoreState: " +count); } @Override public void run (SourceContext<Tuple4<Integer, String, String, Integer>> sourceContext) throws Exception { while (is_running){ for (int i = 0 ; i < 10 ; i++) { sourceContext.collect(Tuple4.of(1 , "hello-" + count, "alphabet" , count)); count++; } sourceCount = count; Thread.sleep(2000 ); if (count>=100 ){ System.out.println("err_________________" ); throw new Exception ("exception made by ourself!" ); } } } @Override public void cancel () { is_running = false ; } } private static class OperatorStateAppy implements WindowFunction <Tuple4<Integer,String,String,Integer>,Integer,Tuple,TimeWindow>,ListCheckpointed<UserState>{ private int total = 0 ; @Override public List<UserState> snapshotState (long l, long l1) throws Exception { List<UserState> userStateList = new ArrayList <>(); UserState state = new UserState (); state.setCount(total); userStateList.add(state); return userStateList; } @Override public void restoreState (List<UserState> list) throws Exception { total = list.get(0 ).getCount(); } @Override public void apply (Tuple tuple, TimeWindow timeWindow, Iterable<Tuple4<Integer, String, String, Integer>> iterable, Collector<Integer> collector) throws Exception { int count = 0 ; for (Tuple4<Integer,String,String,Integer> data:iterable){ count ++; } total = total + count; System.out.println("all count:" +total+" source count:" + sourceCount); collector.collect(total); } } static class UserState implements Serializable { private int count; public int getCount () { return count; } public void setCount (int count) { this .count = count; } }
输出:
all count:10 source count:10 all count:20 source count:20 all count:30 source count:30 all count:40 source count:40 all count:50 source count:50 all count:60 source count:60 all count:70 source count:70 all count:80 source count:80 all count:90 source count:90 all count:100 source count:100 err_________________ OperatorStateSource restoreState: 100 all count:110 source count:110
从结果可以看出达到了想要的结果,当数据源中断后,调用了restore方法,恢复了state的值。
总结:
两者的区别,实现CheckpointedFunction接口,有两种形式的ListState API可以使用,分别是getListState以及getListUnionState,它们都会返回一个ListState,但是他们在重新分区的时候会有区别,后面会详细介绍。如果我们直接实现ListCheckpointed接口,那么就会规定使用ListState,不需要我们进行初始化,Flink内部帮我们解决。
state重分区 当我们在一个job中重新设置了一个operator的并行度之后,之前的state该如何被分配呢?下面就ListState、ListUnionState以及BroadcastState来说明如何进行重分区。
原文:https://github.com/heibaiying/BigData-Notes