MapReduce — это модель программирования и связанная с ней реализация для обработки и генерации больших наборов данных с параллельным и распределенным алгоритмом на кластере . [1] [2] [3]
Программа MapReduce состоит из процедуры map , которая выполняет фильтрацию и сортировку (например, сортировку студентов по имени в очереди, одна очередь для каждого имени), и метода reduce , который выполняет операцию суммирования (например, подсчет количества студентов в каждой очереди, получение частот имен). «Система MapReduce» (также называемая «инфраструктурой» или «фреймворком») организует обработку, распределяя распределенные серверы, параллельно выполняя различные задачи, управляя всеми коммуникациями и передачей данных между различными частями системы и обеспечивая избыточность и отказоустойчивость .
Модель является специализацией стратегии split-apply-combine для анализа данных. [4] Она вдохновлена функциями map и reduce , обычно используемыми в функциональном программировании , [5] хотя их цель в фреймворке MapReduce не та же самая, что и в их исходных формах. [6] Ключевым вкладом фреймворка MapReduce являются не сами функции map и reduce (которые, например, напоминают операции reduce [8] и scatter [ 9] стандарта Message Passing Interface 1995 года ), а масштабируемость и отказоустойчивость, достигаемые для различных приложений благодаря распараллеливанию. Таким образом, однопоточная реализация MapReduce обычно не быстрее традиционной (не MapReduce) реализации; любые преимущества обычно видны только при многопоточных реализациях на многопроцессорном оборудовании. [10] Использование этой модели выгодно только тогда, когда в игру вступают оптимизированная распределенная операция shuffle (которая снижает стоимость сетевой связи) и функции отказоустойчивости фреймворка MapReduce. Оптимизация стоимости связи имеет важное значение для хорошего алгоритма MapReduce. [11]
Библиотеки MapReduce были написаны на многих языках программирования с различными уровнями оптимизации. Популярная реализация с открытым исходным кодом , которая поддерживает распределенные перемешивания, является частью Apache Hadoop . Название MapReduce изначально относилось к фирменной технологии Google , но с тех пор стало общей торговой маркой . К 2014 году Google больше не использовал MapReduce в качестве своей основной модели обработки больших данных , [12] и разработка Apache Mahout перешла к более эффективным и менее ориентированным на диск механизмам, которые включали в себя полные возможности map и reduce. [13]
MapReduce — это фреймворк для обработки параллелизуемых задач в больших наборах данных с использованием большого количества компьютеров (узлов), которые вместе называются кластером ( если все узлы находятся в одной локальной сети и используют схожее оборудование) или сеткой (если узлы совместно используются в географически и административно распределенных системах и используют более разнородное оборудование). Обработка может выполняться на данных, хранящихся либо в файловой системе (неструктурированные), либо в базе данных (структурированные). MapReduce может использовать локальность данных, обрабатывая их вблизи места хранения, чтобы минимизировать накладные расходы на связь.
Фреймворк (или система) MapReduce обычно состоит из трех операций (или шагов):
map
функцию к локальным данным и записывает вывод во временное хранилище. Главный узел гарантирует, что обрабатывается только одна копия избыточных входных данных.map
) таким образом, что все данные, принадлежащие одному ключу, располагаются на одном и том же рабочем узле.MapReduce позволяет выполнять распределенную обработку операций отображения и редукции. Карты могут выполняться параллельно, при условии, что каждая операция отображения независима от других; на практике это ограничено количеством независимых источников данных и/или количеством ЦП вблизи каждого источника. Аналогично, набор «редукторов» может выполнять фазу редукции, при условии, что все выходные данные операции отображения, которые имеют один и тот же ключ, представлены одному и тому же редуктору в одно и то же время или что функция редукции является ассоциативной . Хотя этот процесс часто кажется неэффективным по сравнению с алгоритмами, которые являются более последовательными (поскольку необходимо запустить несколько экземпляров процесса редукции), MapReduce можно применять к значительно большим наборам данных, чем может обработать один «товар» — большая ферма серверов может использовать MapReduce для сортировки петабайта данных всего за несколько часов. [14] Параллелизм также предлагает некоторую возможность восстановления после частичного отказа серверов или хранилища во время операции: если один преобразователь или редуктор выходит из строя, работу можно перепланировать — при условии, что входные данные все еще доступны.
Другой способ рассмотреть MapReduce — это 5-шаговое параллельное и распределенное вычисление:
Эти пять шагов можно логически рассматривать как выполняемые последовательно — каждый шаг начинается только после завершения предыдущего, — хотя на практике их можно чередовать, если это не влияет на конечный результат.
Во многих ситуациях входные данные могут быть уже распределены ( «разбиты на части» ) среди множества различных серверов, и в этом случае шаг 1 иногда можно значительно упростить, назначив серверы Map, которые будут обрабатывать локально присутствующие входные данные. Аналогично шаг 3 иногда можно ускорить, назначив процессоры Reduce, которые находятся как можно ближе к данным, сгенерированным Map, которые им необходимо обработать.
Функции Map и Reduce из MapReduce определены относительно данных, структурированных в парах (ключ, значение). Map принимает одну пару данных с типом в одном домене данных и возвращает список пар в другом домене:
Map(k1,v1)
→list(k2,v2)
Функция Map применяется параллельно к каждой паре (с ключом k1
) во входном наборе данных. Это создает список пар (с ключом k2
) для каждого вызова. После этого фреймворк MapReduce собирает все пары с тем же ключом ( k2
) из всех списков и группирует их вместе, создавая одну группу для каждого ключа.
Затем функция Reduce применяется параллельно к каждой группе, что, в свою очередь, создает набор значений в том же домене:
Reduce(k2, list (v2))
→ list((k3, v3))
[15]
Каждый вызов Reduce обычно производит либо одну пару ключ-значение, либо пустой возврат, хотя одному вызову разрешено возвращать более одной пары ключ-значение. Возвраты всех вызовов собираются в виде желаемого списка результатов.
Таким образом, фреймворк MapReduce преобразует список пар (ключ, значение) в другой список пар (ключ, значение). [16] Такое поведение отличается от типичного функционального программирования map и reduce, которое принимает список произвольных значений и возвращает одно единственное значение, объединяющее все значения, возвращаемые map.
Необходимо, но недостаточно иметь реализации абстракций map и reduce для реализации MapReduce. Распределенные реализации MapReduce требуют средства соединения процессов, выполняющих фазы Map и Reduce. Это может быть распределенная файловая система . Возможны и другие варианты, такие как прямая потоковая передача от картографов к редукторам или для процессоров картографирования, чтобы они предоставляли свои результаты редукторам, которые запрашивают их.
Канонический пример MapReduce подсчитывает появление каждого слова в наборе документов: [17]
функция map (имя строки, документ строки): // имя: имя документа // документ: содержимое документа для каждого слова w в документе: испускают (w, 1)функция reduce (String word, Iterator partialCounts): // word: слово // partialCounts: список агрегированных частичных счетчиков сумма = 0 для каждого ПК в partialCounts: сумма += пк выдать (слово, сумма)
Здесь каждый документ разбивается на слова, и каждое слово подсчитывается функцией map , используя слово в качестве ключа результата. Фреймворк объединяет все пары с одинаковым ключом и передает их в один и тот же вызов reduce . Таким образом, этой функции нужно просто суммировать все свои входные значения, чтобы найти общее количество появлений этого слова.
В качестве другого примера представьте, что для базы данных с 1,1 миллиарда человек, кто-то хотел бы вычислить среднее количество социальных контактов человека в зависимости от возраста. В SQL такой запрос можно было бы выразить так:
ВЫБЕРИТЕ возраст , СРЕДНИЙ ( контакты ) ИЗ соц . персоны ГРУППИРОВАТЬ ПО ВОЗРАСТУ УПОРЯДОЧИТЬ ПО ВОЗРАСТУ
При использовании MapReduce значения ключа K1 могут быть целыми числами от 1 до 1100, каждое из которых представляет пакет из 1 миллиона записей, значение ключа K2 может быть возрастом человека в годах, и это вычисление может быть выполнено с использованием следующих функций:
Функция Map — это входные данные: целое число K1 от 1 до 1100, представляющее собой пакет из 1 миллиона записей social.person для каждой записи social.person в пакете K1. Пусть Y — возраст человека. Пусть N — количество контактов, с которыми он имеет. Создаем одну выходную запись (Y, (N, 1)) Повторяем Конец функцииФункция Reduce имеет входные данные: возраст (в годах) Y для каждой входной записи (Y,(N,C)) сделать Накопить в S сумму N*C Накопить в C new сумму C повторить пусть A будет S/C new создать одну выходную запись (Y,(A,C new )) конец функции
Обратите внимание, что в функции Reduce C — это количество людей, имеющих в общей сложности N контактов, поэтому в функции Map естественно записать C=1 , поскольку каждая выходная пара относится к контактам одного человека.
Система MapReduce выстроит в ряд 1100 процессоров Map и предоставит каждому из них соответствующий 1 миллион входных записей. Шаг Map создаст 1,1 миллиарда записей (Y,(N,1)) со значениями Y в диапазоне, скажем, от 8 до 103. Затем система MapReduce выстроит в ряд 96 процессоров Reduce, выполнив операцию перетасовки пар ключ/значение, поскольку нам нужно среднее значение по возрасту, и предоставит каждому из них миллионы соответствующих входных записей. Шаг Reduce приведет к значительно сокращенному набору всего из 96 выходных записей (Y,A) , которые будут помещены в конечный файл результатов, отсортированные по Y.
Информация о количестве в записи важна, если обработка сокращается более одного раза. Если бы мы не добавляли количество записей, вычисленное среднее значение было бы неверным, например:
-- вывод карты №1: возраст, количество контактов10, 910, 910, 9
-- вывод карты №2: возраст, количество контактов10, 910, 9
-- вывод карты №3: возраст, количество контактов10, 10
Если мы сократим файлы № 1 и № 2 , то получим новый файл со средним числом контактов 9 для 10-летнего человека ((9+9+9+9+9)/5):
-- уменьшить шаг №1: возраст, среднее количество контактов10, 9
Если мы сократим его с помощью файла №3 , мы потеряем счет того, сколько записей мы уже видели, поэтому в итоге получим в среднем 9,5 контактов для 10-летнего человека ((9+10)/2), что неверно. Правильный ответ: 9,1 66 = 55 / 6 = (9×3+9×2+10×1)/(3+2+1).
Архитектура фреймворка программного обеспечения придерживается принципа открытости-закрытости , где код эффективно делится на неизменяемые замороженные точки и расширяемые горячие точки . Замороженная точка фреймворка MapReduce представляет собой большую распределенную сортировку. Горячие точки, которые определяет приложение, следующие:
Читатель входных данных делит входные данные на соответствующие по размеру «разделы» (на практике обычно от 64 МБ до 128 МБ), а фреймворк назначает один раздел для каждой функции Map . Читатель входных данных считывает данные из стабильного хранилища (обычно распределенной файловой системы ) и генерирует пары ключ/значение.
Типичным примером будет чтение каталога, полного текстовых файлов, и возврат каждой строки в виде записи.
Функция Map берет ряд пар ключ/значение, обрабатывает каждую и генерирует ноль или более выходных пар ключ/значение. Входные и выходные типы карты могут отличаться (и часто отличаются).
Если приложение выполняет подсчет слов, функция map разобьет строку на слова и выведет пару ключ/значение для каждого слова. Каждая выходная пара будет содержать слово в качестве ключа и количество вхождений этого слова в строке в качестве значения.
Каждый выход функции Map выделяется определенному редуктору функцией раздела приложения для целей шардинга . Функция раздела получает ключ и количество редукторов и возвращает индекс нужного редуктора .
Типичное значение по умолчанию — хэширование ключа и использование хэш-значения по модулю числа редукторов . Важно выбрать функцию разделения, которая обеспечивает приблизительно равномерное распределение данных по шарду для целей балансировки нагрузки , в противном случае операция MapReduce может быть приостановлена в ожидании завершения работы медленных редукторов (т. е. редукторов, назначающих большие доли неравномерно разделенных данных).
Между этапами map и reduce данные перемешиваются ( параллельно сортируются/обмениваются между узлами) для перемещения данных из узла map, который их создал, в шард, в котором они будут сокращены. Перемешивание иногда может занять больше времени, чем время вычислений, в зависимости от пропускной способности сети, скорости ЦП, произведенных данных и времени, затраченного на вычисления map и reduce.
Входные данные для каждого Reduce извлекаются из машины, на которой запущен Map , и сортируются с использованием функции сравнения приложения .
Фреймворк вызывает функцию Reduce приложения один раз для каждого уникального ключа в отсортированном порядке. Reduce может перебирать значения, связанные с этим ключом, и выдавать ноль или более выходов.
В примере с подсчетом слов функция Reduce принимает входные значения, суммирует их и генерирует единый вывод слова и окончательной суммы.
Output Writer записывает выходные данные Reduce в стабильное хранилище.
Свойства моноидов являются основой для обеспечения корректности операций MapReduce. [18] [19]
В пакете Algebird [20] реализация Map/Reduce на Scala явно требует тип класса моноида. [21]
Операции MapReduce имеют дело с двумя типами: типом A входных данных, которые отображаются, и типом B выходных данных, которые сокращаются.
Операция Map принимает отдельные значения типа A и создает для каждого a:A значение b:B ; Операция Reduce требует бинарной операции, определенной для значений типа B ; она состоит из свертывания всех доступных b:B в одно значение.
С точки зрения основных требований, любая операция MapReduce должна включать возможность произвольной перегруппировки сокращаемых данных. Такое требование сводится к двум свойствам операции •:
Второе свойство гарантирует, что при распараллеливании на нескольких узлах узлы, не имеющие данных для обработки, не окажут влияния на результат.
Эти два свойства сводятся к наличию моноида ( B , •, e ) на значениях типа B с операцией • и с нейтральным элементом e .
Нет никаких требований к значениям типа A ; произвольная функция A → B может быть использована для операции Map . Это означает, что у нас есть катаморфизм A* → ( B , •, e ). Здесь A* обозначает звезду Клини , также известную как тип списков над A .
Сама по себе операция Shuffle не имеет отношения к сути MapReduce, она нужна для распределения вычислений по облаку.
Из вышесказанного следует, что не каждая бинарная операция Reduce будет работать в MapReduce. Вот контрпримеры:
Программы MapReduce не гарантированно быстрые. Главное преимущество этой модели программирования заключается в использовании оптимизированной операции перемешивания платформы и необходимости писать только части Map и Reduce программы. На практике автор программы MapReduce, однако, должен учитывать шаг перемешивания; в частности, функция разделения и объем данных, записанных функцией Map, могут оказать большое влияние на производительность и масштабируемость. Дополнительные модули, такие как функция Combiner, могут помочь сократить объем данных, записываемых на диск и передаваемых по сети. Приложения MapReduce могут достигать сублинейного ускорения при определенных обстоятельствах. [22]
При проектировании алгоритма MapReduce автору необходимо выбрать хороший компромисс [11] между затратами на вычисления и коммуникацию. Затраты на коммуникации часто доминируют над затратами на вычисления, [11] [22] и многие реализации MapReduce разработаны для записи всех коммуникаций в распределенное хранилище для восстановления после сбоя.
При настройке производительности MapReduce необходимо учитывать сложность отображения, перемешивания, сортировки (группировки по ключу) и сокращения. Объем данных, создаваемых картогенераторами, является ключевым параметром, который переносит большую часть вычислительных затрат между отображением и сокращением. Сокращение включает сортировку (группировку ключей), которая имеет нелинейную сложность. Следовательно, небольшие размеры разделов сокращают время сортировки, но есть компромисс, поскольку наличие большого количества редукторов может быть непрактичным. Влияние размера разделяемого блока незначительно (если только он не выбран особенно плохо, скажем, <1 МБ). Выигрыш от чтения нагрузки некоторыми картогенераторами с локальных дисков в среднем незначителен. [23]
Для процессов, которые завершаются быстро, и где данные помещаются в основную память одной машины или небольшого кластера, использование фреймворка MapReduce обычно неэффективно. Поскольку эти фреймворки предназначены для восстановления после потери целых узлов во время вычислений, они записывают промежуточные результаты в распределенное хранилище. Такое восстановление после сбоя обходится дорого и окупается только тогда, когда вычисления затрагивают много компьютеров и требуют длительного времени выполнения. Задачу, которая завершается за секунды, можно просто перезапустить в случае ошибки, а вероятность сбоя хотя бы одной машины быстро растет с размером кластера. В таких задачах реализации, сохраняющие все данные в памяти и просто перезапускающие вычисления при сбоях узлов или — когда данные достаточно малы — нераспределенные решения часто будут быстрее, чем система MapReduce.
MapReduce достигает надежности путем распределения ряда операций над набором данных по каждому узлу в сети. Ожидается, что каждый узел будет периодически отчитываться о выполненной работе и обновлениях статуса. Если узел замолкает дольше этого интервала, главный узел (аналогично главному серверу в файловой системе Google ) регистрирует узел как мертвый и отправляет назначенную узлу работу другим узлам. Отдельные операции используют атомарные операции для именования выходных файлов в качестве проверки, чтобы гарантировать отсутствие параллельных конфликтующих потоков. Когда файлы переименовываются, их также можно копировать под другим именем в дополнение к имени задачи (что позволяет избежать побочных эффектов ).
Операции reduce работают примерно так же. Из-за их худших свойств в отношении параллельных операций главный узел пытается запланировать операции reduce на том же узле или в той же стойке, что и узел, содержащий обрабатываемые данные. Это свойство желательно, поскольку оно сохраняет пропускную способность в магистральной сети центра обработки данных.
Реализации не обязательно являются высоконадежными. Например, в старых версиях Hadoop NameNode был единственной точкой отказа для распределенной файловой системы. Более поздние версии Hadoop имеют высокую доступность с активным/пассивным отказоустойчивым режимом для «NameNode» .
MapReduce полезен в широком спектре приложений, включая распределенный поиск на основе шаблонов, распределенную сортировку, обращение графа веб-ссылок, разложение сингулярных значений, [24] статистику журнала веб-доступа, построение инвертированного индекса , кластеризацию документов , машинное обучение , [25] и статистический машинный перевод . Более того, модель MapReduce была адаптирована к нескольким вычислительным средам, таким как многоядерные и многоядерные системы, [26] [27] [28] настольные сетки, [29] многокластерные, [30] среды добровольных вычислений, [31] динамические облачные среды, [32] мобильные среды, [33] и среды высокопроизводительных вычислений. [34]
В Google MapReduce использовался для полной регенерации индекса Google Всемирной паутины . Он заменил старые специальные программы, которые обновляли индекс и запускали различные анализы. [35] С тех пор разработка в Google перешла к таким технологиям, как Percolator, FlumeJava [36] и MillWheel , которые предлагают потоковую работу и обновления вместо пакетной обработки, что позволяет интегрировать «живые» результаты поиска без перестройки всего индекса. [37]
Стабильные входы и выходы MapReduce обычно хранятся в распределенной файловой системе . Временные данные обычно хранятся на локальном диске и извлекаются удаленно редукторами.
Дэвид ДеВитт и Майкл Стоунбрейкер , специалисты по параллельным базам данных и архитектурам без разделения ресурсов , критиковали широту проблем, для которых может использоваться MapReduce. [38] Они назвали его интерфейс слишком низкоуровневым и усомнились в том, что он действительно представляет собой смену парадигмы, как утверждают его сторонники. [39] Они оспорили заявления сторонников MapReduce о новизне, сославшись на Teradata как на пример предшествующего уровня техники , который существует уже более двух десятилетий. Они также сравнили программистов MapReduce с программистами CODASYL , отметив, что оба «пишут на языке низкого уровня, выполняя низкоуровневые манипуляции записями». [39] Использование MapReduce входных файлов и отсутствие поддержки схем не позволяет улучшить производительность, обеспечиваемую обычными функциями системы баз данных, такими как B-деревья и хэш-секционирование , хотя такие проекты, как Pig (или PigLatin) , Sawzall , Apache Hive , [40] HBase [41] и Bigtable [41] [42] решают некоторые из этих проблем.
Грег Йоргенсен написал статью, в которой отвергает эти взгляды. [43] Йоргенсен утверждает, что весь анализ ДеВитта и Стоунбрейкера беспочвенен, поскольку MapReduce никогда не проектировался и не предназначался для использования в качестве базы данных.
Впоследствии в 2009 году ДеВитт и Стоунбрейкер опубликовали подробное сравнительное исследование, сравнивающее производительность подходов Hadoop MapReduce и RDBMS для нескольких конкретных задач. [44] Они пришли к выводу, что реляционные базы данных предлагают реальные преимущества для многих видов использования данных, особенно при сложной обработке или когда данные используются в масштабах всего предприятия, но что MapReduce может быть проще для пользователей в использовании для простых или одноразовых задач обработки.
Парадигма программирования MapReduce также была описана в диссертации Дэнни Хиллиса 1985 года [45], предназначенной для использования на Connection Machine , где она называлась «xapping/reduction» [46] и полагалась на специальное оборудование этой машины для ускорения как map, так и reduce. Диалект, в конечном итоге используемый для Connection Machine, StarLisp 1986 года , имел параллельный *map
и reduce!!
, [47] который, в свою очередь, был основан на Common Lisp 1984 года , который имел непараллельный map
и reduce
встроенный. [48] Древовидный подход, который архитектура гиперкуба Connection Machine использует для выполнения во времени [49] , по сути, тот же самый, что и подход, упомянутый в статье Google как предыдущая работа. [3] : 11reduce
В 2010 году Google получила то, что описывается как патент на MapReduce. Патент, поданный в 2004 году, может охватывать использование MapReduce программным обеспечением с открытым исходным кодом, таким как Hadoop , CouchDB и другими. В Ars Technica редактор признал роль Google в популяризации концепции MapReduce, но усомнился в том, является ли патент действительным или новым. [50] [51] В 2013 году в рамках своего «Обязательства не утверждать открытый патент (OPN)» Google пообещала использовать патент только в оборонительных целях. [52] [53] Ожидается, что срок действия патента истечет 23 декабря 2026 года. [54]
Задачи MapReduce должны быть написаны как ациклические программы потока данных, то есть mapper без состояния, за которым следует reducer без состояния, которые выполняются планировщиком пакетных заданий. Эта парадигма затрудняет повторные запросы наборов данных и накладывает ограничения, которые ощущаются в таких областях, как обработка графов [55], где итерационные алгоритмы, которые повторно посещают один рабочий набор несколько раз, являются нормой, а также, при наличии данных на диске с высокой задержкой , даже в области машинного обучения , где требуются множественные проходы по данным, хотя алгоритмы могут допускать последовательный доступ к данным при каждом проходе. [56]
Реализация MapReduce в MongoDB, по-видимому, мало связана с map reduce. Поскольку, насколько я понял, она однопоточная, в то время как map-reduce предназначена для высокопараллельного использования в кластере. ... MongoDB MapReduce является однопоточной на одном сервере...
«Мы больше не используем MapReduce» [Урс Хёльцле, старший вице-президент по технической инфраструктуре в Google]
По состоянию на октябрь Google запускал около 3000 вычислительных задач в день через MapReduce, что составляет тысячи машино-дней, согласно презентации Дина. Помимо прочего, эти пакетные процедуры анализируют последние веб-страницы и обновляют индексы Google.