- Publié le 25 août 2018
- Marqué avecamazon-dynamodb,aws,échelle
Au travail, nous utilisons DynamoDB pour stocker de grandes collections d'enregistrements - ceux-ci sont traités par lepipeline de cataloguequi alimente notre API, qui alimente finalement la recherche sur le nouveau site Web Wellcome Collection.
Tous nos modèles sont définis commeClasses de cas Scala, et on utilisescannonspour interagir avec DynamoDB. Scanamo est un wrapper autour du SDK DynamoDB qui masque le travail de sérialisation et de désérialisation des classes de cas dans le format interne DynamoDB.
Lorsque nous modifions le pipeline, nous voulons retraiter tous les enregistrements existants dans DynamoDB (nous appelons cela « réindexation »). Si vous souhaitez itérer sur les enregistrements dans DynamoDB, vous devez effectuer uneOpération de numérisation. Une analyse renvoie les enregistrements dans l'ordre, de sorte que vous ne pouvez exécuter qu'un seul travailleur à la fois - c'est assez lent. Nous voulons traiter la table en parallèle, nous avons donc un mécanisme de bricolage pour diviser la table en « fragments », puis nous traitons chaque fragment séparément.
Les tables DynamoDB peuvent produireun flux d'événementsdes mises à jour du tableau. Nous connectons ce flux à une fonction Lambda, qui sélectionne une « partition de réindexation » pour une ligne et réécrit cette partition dans la table. L'ID de partition est copié dans unindice secondaire global (GSI), ce qui nous permet de déterminer efficacement quelles lignes se trouvent dans un fragment de réindexation particulier.
Lorsque nous voulons réindexer la table, nous exécutons un travailleur par fragment de réindexation - chaque ligne se trouve dans exactement un fragment de réindexation, et le GSI nous permet de rechercher le contenu de chaque fragment. Il s'exécute beaucoup plus rapidement que le traitement de la table en séquence.
C'est aussi assez cassant. Il s'appuie sur le flux DynamoDB et Lambda qui fonctionnent correctement (les deux pouvant être instables), c'est une infrastructure supplémentaire que nous devons entretenir et nous sommes bloqués avec une taille de partition fixe. Si nous décidons de modifier la taille du fragment plus tard, nous devons revenir en arrière et repartitionner l'intégralité de la table.
Cela a une odeur deSyndrome non inventé ici. Nous ne pouvons pas être les seuls à vouloir traiter une table DynamoDB en parallèle !
Hier, je suis tombé sur un ancien article de blogannonçant des analyses parallèlesdans DynamoDB. C'est exactement ce dont nous avons besoin - c'est une API prise en charge, qui ne nécessite pas d'infrastructure supplémentaire de notre part et qui nous permet de choisir une taille de fragment différente à chaque analyse. Ça vaut le coup d'oeil.
Je n'ai pas trouvé d'implémentation d'analyse parallèle qui utilise également Scanamo et les classes de cas, j'ai donc décidé d'écrire la mienne. (Je l'ai fait sur Google avant de plonger !) C'est un composant autonome utile, alors j'ai pensé écrire ce que j'ai trouvé.
Remarque : il s'agit d'un prototype et non d'un code de production. Nous le mettrons probablement en production à un moment donné, mais je ne sais pas combien de temps cela prendra.
Création d'un client API
Avant d'écrire un code d'analyse parallèle, nous avons besoin d'un client API pour travailler avec l'API DynamoDB :
importer com.amazonaws.auth.{AWSStaticCredentialsProviderAWSStaticCredentialsProvider, Informations d'identification AWS de base}importer com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilderval AWS_ACCESS_ID = "xxx"val AWS_SECRET_ACCESS_KEY = "xxx"val AWS_REGION = "eu-ouest-1"val dynamoDbClient = AmazonDynamoDBClientBuilder .standard .avecCredentials( nouveau AWSStaticCredentialsProviderAWSStaticCredentialsProvider( nouveau Informations d'identification AWS de base(AWS_ACCESS_ID, AWS_SECRET_ACCESS_KEY) ) ) .avecRégion(AWS_REGION) .construire()
Il existe de nombreuses façons d'obtenir des informations d'identification dans le kit AWS SDK. J'utilise ici des clés codées en dur car elles sont les plus simples pour un exemple autonome.
Répartition étape par étape
Il y a unExemple Java d'analyse parallèledans la documentation AWS, qui effectue l'analyse, mais n'imprime que le résultat. Je l'ai utilisé comme point de départ, mais j'ai ensuite dû creuser plus profondément pour trouver comment transmettre le résultat comme une valeur utile.
UNbalayage parallèledivise le tableau en "segments". Vous créez une collection de nœuds de calcul, chacun d'entre eux faisant sa propre requête Scan avec deux paramètres :
Nombre total de segments
est le nombre total de segments. Chaque travailleur doit utiliser la même valeur.Segment
est l'index du segment analysé par ce travailleur particulier - notez que cette valeur est indexée à 0. Chaque travailleur transmet une valeur différente.
Ces paramètres peuvent être passés comme une instance deScanSpec, alors construisons cela :
importer com.amazonaws.services.dynamodbv2.document.spec.ScanSpecval scanSpec = nouveau ScanSpec() .avecTotalSegments(totalSegments) // totalSegments : entier .avecSegment(segment) // segment : entier
Ensuite, pour effectuer le scan lui-même, nous utilisons l'API Document et passons ce ScanSpec en paramètre :
importer com.amazonaws.services.dynamodbv2.document.{ DynamoDB, Collection d'articles, Résultat de l'analyse}val documentApiClient = nouveau DynamoDB(dynamoDbClient)val tableau = documentApiClient.obtenirTable(nom de la table) // nomtable : chaîneval articleCollection: Collection d'articles[Résultat de l'analyse] = tableau.analyse(scanSpec)
Je soupçonne que cela ne renvoie que la première page de résultats - je n'ai joué avec cela que dans des tables de jouets avec une poignée d'exemples de lignes, pas nos plus grandes bases de données. Si je l'utilise en production, je souhaiterai des tests et des vérifications autour de la pagination. (Ou assurez-vous que je choisis suffisamment de segments pour que chaque segment tienne sur une seule page !)
En jouant un peu dans IntelliJ pour voir quelles méthodes j'avais à ma disposition, je suis finalement tombé sur ce qui suit pour transformer la collection en une liste Scala :
importer com.amazonaws.services.dynamodbv2.document.Itemimporter scala.collection.JavaConverters._val articles: Liste[Article] = articleCollection.à l'échelle.lister
Il s'agit d'un "élément" dans le sens d'une collection générique de paires clé-valeur, mais ce n'est pas un type Scala approprié, ce que je veux vraiment. Il s'agit d'une représentation DynamoDB interne d'une ligne.
Je suis allé fouiller dans Scanamo pour voir comment ils sérialisent un article en tant que classe de cas. Je n'ai pas tout à fait trouvé, mais en cherchantdans ScanamoFree.scala, je suis tombé sur des indices de deux manières :
objet ScanamoGratuit { ... définitivement obtenir[J](nom de la table: Chaîne)(clé: UniqueKey[_])( implicite pi: DynamoFormat[J]): ScanamoOps[Option[Soit[DynamoReadError,J]]] = pour { résolution <- ScanamoOps.obtenir(nouveau GetItemRequest().withTableName(nom de la table).avec clé(clé.asAVMap.commeJava)) } rendement Option(résolution.obtenir l'article).carte(lire[J]) ... définitivement lire[J](m: java.util.Map[Chaîne,Valeur d'attribut])(implicite F: DynamoFormat[J]): Soit[DynamoReadError,J] = F.lire(nouveau Valeur d'attribut().avec M(m))}
Dans leobtenir()
, il semble que le corps de la compréhension appelle le SDK DynamoDB Java (GetItemRequest est un cadeau mort), puis il le transmet à unlire()
méthode qui le décompresse en tant que classe de cas. Lelire()
La méthode ne prend pas tout à fait un élément, mais si je peux obtenir la carte String/AttributeValue d'un élément, alors je suis en affaires.
Je suis tombé sur la méthode dont j'ai besoindans un message Stack Overflow:
importer com.amazonaws.services.dynamodbv2.document.internal.InternalUtilsimporter com.amazonaws.services.dynamodbv2.model.AttributeValueimporter com.gu.scanamo.ScanamoFreeimporter com.gu.scanamo.error.DynamoReadErrorval casClasses: Liste[Soit[DynamoReadError,J]] = articles.carte { article => val attributeValueMap: java.util.Map[Chaîne,Valeur d'attribut] Utilsinternes.toAttributeValues(article) ScanamoGratuit.lire[J](attributeValueMap)}
Scanamo lancera une DynamoReadError s'il ne peut pas analyser la ligne en tant que classe de cas - par exemple, si un champ est manquant ou le mauvais type. Vous pouvez extraire l'instance soit en utilisant.right.get
, ou faire une correspondance de modèle sur le résultat.
C'est ce que je voulais quand j'ai commencé - une liste d'instances d'une classe de cas, récupérée à l'aide d'une analyse parallèle.
Changements possibles
Avec le recul, je ne suis pas sûr que cela doive utiliser l'API Document. Je l'ai copié à partir de l'exemple Java, mais il existe également un ScanRequest (similaire à GetItemRequest) qui vous permet de définir le nom de la table, le segment et le nombre total de segments - je n'ai pas vu que je fouinais dans Scanamo.
Ensuite, il y a leobtenir()
méthode sur ScanamoFree qui utilise un GetItemRequest en interne. Je me demande si vous pourriez écrire une méthode similaire qui utilise un ScanRequest en interne et l'ajouter à Scanamo ? J'avoue que les composants internes de Scanamo me laissent toujours un peu confus, donc je n'ai pas approfondi ce sujet.
Enfin, il y a le comportement de pagination, qui est une inconnue complète. Je n'ai pas encore essayé de l'exécuter sur une très grande table, et je voudrais quelques tests autour de la pagination avant de l'exécuter en prod.
Dernier exemple
Cela combine tous les extraits de code en un seul exemple exécutable :
importer com.amazonaws.auth.{AWSStaticCredentialsProviderAWSStaticCredentialsProvider, Informations d'identification AWS de base}importer com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilderimporter com.amazonaws.services.dynamodbv2.document.{ DynamoDB, Article, Collection d'articles, Résultat de l'analyse}importer com.amazonaws.services.dynamodbv2.document.internal.InternalUtilsimporter com.amazonaws.services.dynamodbv2.document.spec.ScanSpecimporter com.amazonaws.services.dynamodbv2.model.AttributeValueimporter com.gu.scanamo.{DynamoFormat, ScanamoGratuit}importer com.gu.scanamo.error.DynamoReadErrorimporter scala.collection.JavaConverters._importer scala.concurrent.Futureval AWS_ACCESS_ID = "xxx"val AWS_SECRET_ACCESS_KEY = "xxx"val AWS_REGION = "eu-ouest-1"val dynamoDbClient = AmazonDynamoDBClientBuilder .standard .avecCredentials( nouveau AWSStaticCredentialsProviderAWSStaticCredentialsProvider( nouveau Informations d'identification AWS de base(AWS_ACCESS_ID, AWS_SECRET_ACCESS_KEY) ) ) .avecRégion(AWS_REGION) .construire()définitivement Balayage parallèle[J]( nom de la table: Chaîne, totalSegments: Int, segment: Int)( implicite pi: DynamoFormat[J]): Avenir[Liste[Soit[DynamoReadError,J]]] = { val scanSpec = nouveau ScanSpec() .avecTotalSegments(totalSegments) .avecSegment(segment) val documentApiClient = nouveau DynamoDB(dynamoDbClient) val tableau = documentApiClient.obtenirTable(nom de la table) Avenir { val articleCollection: Collection d'articles[Résultat de l'analyse] = tableau.analyse(scanSpec) val articles: Liste[Article] = articleCollection.à l'échelle.lister articles.carte { article => val attributeValueMap: java.util.Map[Chaîne,Valeur d'attribut] Utilsinternes.toAttributeValues(article) ScanamoGratuit.lire[J](attributeValueMap) } }}
J'ai enveloppé les appels d'API DynamoDB dans un Future pour une API non bloquante, qui semble plus Scala-y.
N'oubliez pas qu'il ne s'agit que d'un code prototype, pas d'une solution finale. Si je finis par l'utiliser correctement, je le ferai d'abord examiner et tester. Même ainsi, j'espère que c'était instructif - je m'attends à en utiliser à nouveau des morceaux, même si ce n'est pas pour ce problème précis.