Хобрук: Ваш путь к мастерству в программировании

Как эффективно записывать данные из конвейера flink в redis

Я создаю конвейер в Apache flink sql api. Конвейер выполняет простой запрос проекции. Однако мне нужно записать кортежи (а именно некоторые элементы в каждом кортеже) один раз перед запросом и еще раз после запроса. Оказалось, что мой код, который я использую для написания Redis, сильно снижает производительность. Т.е. флинк оказывает обратное давление при очень малой скорости передачи данных. Что не так с моим кодом и как я могу его улучшить. Любые рекомендации, пожалуйста.

Когда я перестал писать в Redis до и после, производительность была отличной. Вот мой код конвейера:

public class QueryExample {
    public static Long throughputCounterAfter=new Long("0");
    public static void main(String[] args) {
        int k_partitions = 10;
        reamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(5 * 32);
        Properties props = new Properties();
        props.setProperty("zookeeper.connect", "zookeeper-node-01:2181");
        props.setProperty("bootstrap.servers", "kafka-node-01:9092,kafka-node-02:9092,kafka-node-03:9092");
        // not to be shared with another job consuming the same topic
        props.setProperty("group.id", "flink-group");
        props.setProperty("enable.auto.commit","false");
        FlinkKafkaConsumer011<String> purchasesConsumer=new FlinkKafkaConsumer011<String>("purchases",
                new SimpleStringSchema(),
                props);

        DataStream<String> purchasesStream = env
                .addSource(purchasesConsumer)
                .setParallelism(Math.min(5 * 32, k_partitions));
        DataStream<Tuple4<Integer, Integer, Integer, Long>> purchaseWithTimestampsAndWatermarks =
                purchasesStream
                        .flatMap(new PurchasesParser())
                        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple4<Integer, Integer, Integer, Long>>(Time.seconds(10)) {

                            @Override
                            public long extractTimestamp(Tuple4<Integer, Integer, Integer, Long> element) {
                                return element.getField(3);
                            }
                        });

        Table purchasesTable = tEnv.fromDataStream(purchaseWithTimestampsAndWatermarks, "userID, gemPackID,price, rowtime.rowtime");
        tEnv.registerTable("purchasesTable", purchasesTable);

        purchaseWithTimestampsAndWatermarks.flatMap(new WriteToRedis());
        Table result = tEnv.sqlQuery("SELECT  userID, gemPackID, rowtime from purchasesTable");
        DataStream<Tuple2<Boolean, Row>> queryResultAsDataStream = tEnv.toRetractStream(result, Row.class);
        queryResultAsDataStream.flatMap(new WriteToRedis());
        try {
            env.execute("flink SQL");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }




/**
 * write to redis
 */
public static class WriteToRedis extends RichFlatMapFunction<Tuple4<Integer, Integer, Integer, Long>, String> {
    RedisReadAndWrite redisReadAndWrite;

    @Override
    public void open(Configuration parameters) {
        LOG.info("Opening connection with Jedis to {}", "redis");
        this.redisReadAndWrite = new RedisReadAndWrite("redis",6379);

    }

    @Override
    public void flatMap(Tuple4<Integer, Integer, Integer, Long> input, Collector<String> out) throws Exception {
        this.redisReadAndWrite.write(input.f0+":"+input.f3+"","time_seen", TimeUnit.NANOSECONDS.toMillis(System.nanoTime())+"");
    }
}
}


public class RedisReadAndWrite {
    private Jedis flush_jedis;

    public RedisReadAndWrite(String redisServerName , int port) {
        flush_jedis=new Jedis(redisServerName,port);
    }


    public void write(String key,String field, String value) {
        flush_jedis.hset(key,field,value);

    }
}

Дополнительная часть: я попробовал вторую реализацию функции процесса, которая пакетно записывает toredis с помощью Jedis. Однако я получаю следующую ошибку. org.apache.flink.runtime.client.JobExecutionException: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketException: сокет не подключен. Я попытался уменьшить количество пакетных сообщений, но через некоторое время все еще получаю ошибки.

Вот реализация функции процесса:

/ ** * запись в redis с использованием функции процесса * /

public static class WriteToRedisAfterQueryProcessFn extends ProcessFunction<Tuple2<Boolean, Row>, String> {
    Long timetoFlush;
    @Override
    public void open(Configuration parameters) {
        flush_jedis=new Jedis("redis",6379,1800);
        p = flush_jedis.pipelined();
        this.timetoFlush=System.currentTimeMillis()-initialTime;
    }

    @Override
    public void processElement(Tuple2<Boolean, Row> input, Context context, Collector<String> collector) throws Exception {
        p.hset(input.f1.getField(0)+":"+new Instant(input.f1.getField(2)).getMillis()+"","time_updated",TimeUnit.NANOSECONDS.toMillis(System.nanoTime())+"");
        throughputAccomulationcount++;
        System.out.println(throughputAccomulationcount);
        if(throughputAccomulationcount==50000){
            throughputAccomulationcount=0L;
            p.sync();
        }
    }
}

Ответы:


1

Низкая производительность, которую вы испытываете, несомненно, связана с тем, что вы делаете синхронный запрос на повторное использование для каждой записи. @kkrugler уже упоминал об асинхронном вводе-выводе, который является обычным средством исправления этой ситуации. Для этого потребуется переключиться на один из клиентов Redis, поддерживающий асинхронную работу.

Другое решение, которое обычно используется при работе с внешними службами, - это группирование групп записей. С jedis вы можете использовать конвейерную обработку. Например, вы можете заменить WriteToRedis RichFlatMapFunction на ProcessFunction, которая выполняет конвейерную запись в redis пакетами определенного размера и полагается на тайм-аут для очистки своего буфера по мере необходимости. Вы можете использовать Flink ListState для буфера.

27.12.2018
  • Большое спасибо за ответ @David Anderson. Я попробовал второе предложение, как вы можете видеть в дополнительной части вопроса (реализация функции процесса). Однако я получаю следующую ошибку. org.apache.flink.runtime.client.JobExecutionException: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketException: сокет не подключен. Я попытался уменьшить количество пакетных сообщений, но через некоторое время все еще получаю ошибки 29.12.2018
  • 50000 - это очень большой размер партии. Бьюсь об заклад, он лучше работает с 50, а не с 50000. Но независимо от размера партии вы не должны удивляться случайным ошибкам. Чтобы что-то подобное работало без перерыва, вы должны быть готовы повторить попытку, когда что-то не удастся. 29.12.2018
  • Кроме того, из-за того, как это написано, вы рискуете потерять записи при возникновении исключений. Вот почему я предложил держать их в состоянии Flink (ListState) до тех пор, пока они не будут успешно отправлены в redis. 29.12.2018
  • Большое тебе спасибо. Я сделал это как пакетирование и многопоточность без использования конвейера, и он хорошо работает. 31.12.2018

  • 2

    Обычно при записи во внешнюю службу это становится узким местом для рабочего процесса Flink. Самый простой способ повысить производительность - сделать эту часть рабочего процесса многопоточным с помощью AsyncFunction. См. эту документацию подробнее.

    - Кен

    27.12.2018
  • Спасибо, Кен. AsyncFunction - очень хорошее решение, и было приятно узнать. Я, однако, решил пакетировать и рассылать сообщения в соответствии с требованиями приложения. 31.12.2018
  • Если вы выполняете свою собственную многопоточность, обратите внимание, что это обычно затрудняет поддержку контрольных точек (и, следовательно, перезапускаемых / восстанавливаемых рабочих процессов). 01.01.2019
  • Новые материалы

    Создание кнопочного меню с использованием HTML, CSS и JavaScript
    Вы будете создавать кнопочное меню, которое имеет состояние наведения, а также позволяет вам выбирать кнопку при нажатии на нее. Финальный проект можно увидеть в этом Codepen . Шаг 1..

    Внедрите OAuth в свои веб-приложения для повышения безопасности
    OAuth — это широко распространенный стандарт авторизации, который позволяет приложениям получать доступ к ресурсам от имени пользователя, не раскрывая его пароль. Это позволяет пользователям..

    Классы в JavaScript
    class является образцом java Script Object. Конструкция «class» позволяет определять классы на основе прототипов с чистым, красивым синтаксисом. // define class Human class Human {..

    Как свинг-трейдеры могут использовать ИИ для больших выигрышей
    По мере того как все больше и больше профессиональных трейдеров и активных розничных трейдеров узнают о возможностях, которые предоставляет искусственный интеллект и машинное обучение для улучшения..

    Как построить любой стол
    Я разработчик программного обеспечения. Я люблю делать вещи и всегда любил. Для меня программирование всегда было способом создавать вещи, используя только компьютер и мое воображение...

    Обзор: Машинное обучение: классификация
    Только что закончил третий курс курса 4 часть специализации по машинному обучению . Как и второй курс, он был посвящен низкоуровневой работе алгоритмов машинного обучения. Что касается..

    Разработка расширений Qlik Sense с qExt
    Использование современных инструментов веб-разработки для разработки крутых расширений Вы когда-нибудь хотели кнопку для установки переменной в приложении Qlik Sense? Когда-нибудь просили..