Saturday, January 8, 2011

Um algoritmo para iterar duas listas simultâneamente enquanto processa os elementos

Os próprios docs no código explicam os detalhes. A ideia é ser uma espécie de algoritmo zip+map, onde o map é customizado para os elementos comuns às duas listas e distintos em cada lista.

Precisei desse algoritmo para realizar uma sincronização de dados.

Ainda, os testes são legais para entender o conceito do Objeto Dublê Spy. Muitas ferramentas de Objetos Dublês (conhecidas como ferramentas de Mock) conseguem abstrair vários conceitos de dublês de maneira transparente. Mas vale a pena entender os tipos conhecidos, até para saber quando se deve utilizar cada técnica.

Apenas por curiosidade, a bibliotecas Mockito para Java (a mais legal por sinal) é implementada utilizando spies, não mocks. Contudo, para o desenvolvedor os conceitos se misturam e idealmente isso nem deveria fazer diferença.



Código:

class MultipleIterator(object): """ Important: 1) Both lists must have comparable elements, that accept ==, < and > comparisons. 2) Both lists will be sorted in the same object, for performance and memory economy. This class provide an algorithm that iterate two lists while it compare their elements. According of the comparison result, one of the following methods is called: def process_equal_elements(self, element1, element2): pass def process_element_exclusive_in_first_list(self, element1): pass def process_element_exclusive_in_second_list(self, element2): pass You must inherit this class and override these methods as you wish. This algorithm is a kind of a complex mixing of the functions 'zip' and 'map'. """ def __init__(self, list1, list2): self.sorted_list1 = list1 self.sorted_list2 = list2 self.sorted_list1.sort() self.sorted_list2.sort() def run(self): """ n = len(sorted_list1) m = len(sorted_list1) This algorithm has complexity omega(Max(n, m)) and O(n + m). This calculations is not considering the complexity of the sort algorithm. """ self.__run_recursive(0, 0) def __run_recursive(self, index1, index2): no_more_elements_in_list1 = index1 >= len(self.sorted_list1) no_more_elements_in_list2 = index2 >= len(self.sorted_list2) if no_more_elements_in_list1 and no_more_elements_in_list2: return if no_more_elements_in_list2: element1 = self.sorted_list1[index1] self.process_element_exclusive_in_first_list(element1) return self.__run_recursive(index1 + 1, index2) if no_more_elements_in_list1: element2 = self.sorted_list2[index2] self.process_element_exclusive_in_second_list(element2) return self.__run_recursive(index1, index2 + 1) element1 = self.sorted_list1[index1] element2 = self.sorted_list2[index2] if element1 == element2: self.process_equal_elements(element1, element2) return self.__run_recursive(index1 + 1, index2 + 1) else: if element1 > element2: self.process_element_exclusive_in_second_list(element2) return self.__run_recursive(index1, index2 + 1) else: # element1 < element2 self.process_element_exclusive_in_first_list(element1) return self.__run_recursive(index1 + 1, index2) def process_equal_elements(self, element1, element2): pass def process_element_exclusive_in_first_list(self, element1): pass def process_element_exclusive_in_second_list(self, element2): pass
Testes:

import unittestfrom multiple_iterator import MultipleIteratorclass MultipleIteratorSpy(MultipleIterator, unittest.TestCase): def __init__(self, sorted_list1, sorted_list2): super(MultipleIteratorSpy, self).__init__(sorted_list1, sorted_list2) self.process_equal_elements_count = 0 self.process_element_exclusive_in_first_list_count = 0 self.process_element_exclusive_in_second_list_count = 0 def process_equal_elements(self, element1, element2): self.process_equal_elements_count += 1 def process_element_exclusive_in_first_list(self, element1): self.process_element_exclusive_in_first_list_count += 1 def process_element_exclusive_in_second_list(self, element2): self.process_element_exclusive_in_second_list_count += 1 def verify_calls(self, a, b, c): self.assertEquals(a, self.process_equal_elements_count) self.assertEquals(b, self.process_element_exclusive_in_first_list_count) self.assertEquals(c, self.process_element_exclusive_in_second_list_count)class MultipleIteratorTests(unittest.TestCase): def test_two_empty_lists_should_execute_nothing(self): sorted_list1 = [] sorted_list2 = [] multipleiterator = MultipleIteratorSpy(sorted_list1, sorted_list2) multipleiterator.run() multipleiterator.verify_calls(0, 0, 0) def test_list1_with_1_element_should_execute_process_element_exclusive_in_first_list(self): sorted_list1 = [1] sorted_list2 = [] multipleiterator = MultipleIteratorSpy(sorted_list1, sorted_list2) multipleiterator.run() multipleiterator.verify_calls(0, 1, 0) def test_list2_with_1_element_should_execute_process_element_exclusive_in_second_list(self): sorted_list1 = [] sorted_list2 = [1] multipleiterator = MultipleIteratorSpy(sorted_list1, sorted_list2) multipleiterator.run() multipleiterator.verify_calls(0, 0, 1) def test_lists_with_1_equal_element_should_execute_process_equal_elements(self): sorted_list1 = [1] sorted_list2 = [1] multipleiterator = MultipleIteratorSpy(sorted_list1, sorted_list2) multipleiterator.run() multipleiterator.verify_calls(1, 0, 0) def test_lists_with_3_equals_elements_should_execute_process_equal_elements_three_times(self): sorted_list1 = [1, 2, 3] sorted_list2 = [1, 2, 3] multipleiterator = MultipleIteratorSpy(sorted_list1, sorted_list2) multipleiterator.run() multipleiterator.verify_calls(3, 0, 0) def test_both_lists_with_exclusive_elements_should_execute_both_methods_of_exclusive_elements(self): sorted_list1 = [2] sorted_list2 = [1] multipleiterator = MultipleIteratorSpy(sorted_list1, sorted_list2) multipleiterator.run() multipleiterator.verify_calls(0, 1, 1) def test_both_lists_with_three_exclusive_elements(self): sorted_list1 = [4, 5, 6] sorted_list2 = [1, 2, 3] multipleiterator = MultipleIteratorSpy(sorted_list1, sorted_list2) multipleiterator.run() multipleiterator.verify_calls(0, 3, 3) multipleiterator = MultipleIteratorSpy(sorted_list2, sorted_list1) multipleiterator.run() multipleiterator.verify_calls(0, 3, 3) def test_both_lists_with_exclusive_elements_different_size(self): sorted_list1 = [2, 3] sorted_list2 = [1] multipleiterator = MultipleIteratorSpy(sorted_list1, sorted_list2) multipleiterator.run() multipleiterator.verify_calls(0, 2, 1) def test_complex_example_with_equal_and_different_elements(self): sorted_list1 = [1, 2, 3, 4, 5, 8] sorted_list2 = [1, 3, 4, 6, 7, 8] multipleiterator = MultipleIteratorSpy(sorted_list1, sorted_list2) multipleiterator.run() multipleiterator.verify_calls(4, 2, 2) def test_complex_example_with_equal_and_different_elements_and_different_list_sizes(self): sorted_list1 = [1, 2, 3, 7, 8] sorted_list2 = [1, 4, 5] multipleiterator = MultipleIteratorSpy(sorted_list1, sorted_list2) multipleiterator.run() multipleiterator.verify_calls(1, 4, 2) if __name__ == "__main__": #import sys;sys.argv = ['', 'Test.testName'] unittest.main()

1 comment:

  1. Primeiro, um amigo (Juarez) converteu o algoritmo para iterativo, para evitar problemas com recursões muito profundas. Basicamente é trocar os return por continue..

    Outro comentário é que o nome MultipleIterator acho que não está bacana. Dei esse nome pensando num Zip+Map, mas, pensando agora, acho que o melhor seria algo como ListSyncronizer.

    ReplyDelete