how about a error streaming table?
```sql
CREATE TABLE IF NOT EXISTS etl.consume_kafka1 (s String) ENGINE = Kafka()
SETTINGS kafka_format = 'CSV', kafka_group_name = 'CHgroup'
, kafka_broker_list = '127.1.1.1:9092', kafka_topic_list = '1-partition'
, kafka_handle_error_mode = 'stream'
, kafka_num_consumers=1;
CREATE MATERIALIZED VIEW IF NOT EXISTS etl.pipeline1 TO etl.trashbin AS SELECT s FROM etl.consume_kafka1
WHERE length(_error) >0
;
CREATE TABLE IF NOT EXISTS etl.consume_kafka2 (s String) ENGINE = Kafka()
SETTINGS kafka_format = 'CSV', kafka_group_name = 'CHgroup'
, kafka_broker_list = '127.2.2.2:9092', kafka_topic_list = '1-partition'
, kafka_handle_error_mode = 'stream'
, kafka_num_consumers=1;
CREATE MATERIALIZED VIEW IF NOT EXISTS etl.pipeline2 TO etl.trashbin AS SELECT s FROM etl.consume_kafka2
WHERE length(_error) >0
;
```