Dans le cadre d’un projet de mise en place d’Azure Data Lake récent j’ai du réaliser un projet C# permettant la génération de ma fabrique de données (ADF V2). Or la documentation officielle de Microsoft est pour l’heure peu fourni en exemples, voici donc quelques scripts qui pourront peut-être vous faire gagner du temps !

Créer une Azure Data Factory & Ajouter un Integration Runtime

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Rest;
using Microsoft.Azure.Management.DataFactory;
using Microsoft.Azure.Management.DataFactory.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory;
using Microsoft.Rest.Serialization;

namespace AzureDataFactoryLoadingLake
{
    class ADF
    {
        public DataFactoryManagementClient client;
        public string dataFactoryName;
        public string resourceGroup;
        public string region;
        public string integrationRuntimeName;
        public string tenantID;
        public string applicationId;
        public string subscriptionId;
        public string authenticationKey;

        public ADF(string tenantID,string resourceGroup, string applicationId,string authenticationKey,string subscriptionId,string dataFactoryName,string region,string integrationRuntimeName)
        {
            this.resourceGroup = resourceGroup;
            this.dataFactoryName = dataFactoryName;
            this.region = region;
            this.tenantID = tenantID;
            this.applicationId = applicationId;
            this.subscriptionId = subscriptionId;
            this.integrationRuntimeName = integrationRuntimeName;
            this.authenticationKey = authenticationKey;

            // Authenticate and create a data factory management client
            var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
            ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
            AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
            ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
            this.client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };

            Console.WriteLine("Creating or updating data factory [" + dataFactoryName + "] ...");
            this.CreateFactory();
            Console.WriteLine("Creating or updating integration runtime (Gateway) [" + integrationRuntimeName + "] ...");
            this.CreateIntegrationRuntimes();
        }
        public void CreateFactory()
        {
            Factory dataFactory = new Factory
            {
                Location = region,
                Identity = new FactoryIdentity()
            };
            client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
            Console.WriteLine(SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings));

            while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation")
            {
                System.Threading.Thread.Sleep(1000);
                Console.WriteLine("*");
            }
        }

        public void CreateIntegrationRuntimes()
        {
            IntegrationRuntimeResource integrationRuntimeResource =new IntegrationRuntimeResource(
            new SelfHostedIntegrationRuntime
            {
                Description = "Integration Runtime for project ..."
            }
            );
            client.IntegrationRuntimes.CreateOrUpdate(resourceGroup, dataFactoryName, integrationRuntimeName, integrationRuntimeResource);
  
            Console.WriteLine(SafeJsonConvert.SerializeObject(integrationRuntimeResource, client.SerializationSettings));
            Console.WriteLine("Authkey : "+ client.IntegrationRuntimes.ListAuthKeys(resourceGroup, dataFactoryName, integrationRuntimeName).AuthKey1);          
        }
    }
}

A noter que la création de l’Integration Runtime au sein de l’Azure Data Factory retourne et affiche la clé nécessaire à l’enregistrement de celle-ci sur la machine.

Service lié

Créer un service lié Azure Data Lake Storage

Console.WriteLine("Creating or updating linked service ADLS [" + ADLSName + "] ...");
SecureString secureString = new SecureString(servicePrincipalKey);
LinkedServiceResource storageLinkedService = new LinkedServiceResource(
		new AzureDataLakeStoreLinkedService
		{
			AccountName = ADLSName
			, DataLakeStoreUri = DataLakeStoreUri
			, ServicePrincipalId = ServicePrincipalId
			, Tenant = tenantId
			, SubscriptionId = subscriptionId
			, ResourceGroupName = resourceGroupName
			,ServicePrincipalKey = secureString
		}
	);
client.LinkedServices.CreateOrUpdate(resourceGroupName, dataFactoryName, ADLSName, storageLinkedService);
Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));

Créer un service lié Azure Data Lake Analytics

Console.WriteLine("Creating or updating linked service ADLA [" + ADLAName + "] ...");
SecureString secureString = new SecureString(servicePrincipalKey);

LinkedServiceResource storageLinkedService = new LinkedServiceResource(
		new AzureDataLakeAnalyticsLinkedService
		{
			AccountName = ADLAName
			,ServicePrincipalId = ServicePrincipalId
			,Tenant = tenantId
			,SubscriptionId = subscriptionId
			,ResourceGroupName = resourceGroupName
			,ServicePrincipalKey = secureString
		}
	);
client.LinkedServices.CreateOrUpdate(resourceGroupName, dataFactoryName, ADLAName, storageLinkedService);
Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));

Créer un service lié Azure Blob Storage

Console.WriteLine("Creating or updating linked service BLOB [BLOB] ...");
SecureString secureString = new SecureString(uSQLScriptBlobStorage);
LinkedServiceResource storageLinkedService = new LinkedServiceResource(

new AzureStorageLinkedService
	{
		ConnectionString= secureString
	}
	);
client.LinkedServices.CreateOrUpdate(resourceGroupName, dataFactoryName, uSQLScriptBlobName, storageLinkedService);
Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));

Créer un service lié SQL On Premise

Console.WriteLine("Creating or updating linked service SQL On Prem [" + SQLName + "] ...");

SecureString secureString = new SecureString(SQLConnectionString);

IntegrationRuntimeReference integrationRuntimeReference = new IntegrationRuntimeReference(integrationRuntimeResource);

LinkedServiceResource storageLinkedService = new LinkedServiceResource (
	new SqlServerLinkedService { 
			ConnectionString = secureString
			, ConnectVia = integrationRuntimeReference
			
	}
	);
client.LinkedServices.CreateOrUpdate(resourceGroupName, dataFactoryName, SQLName, storageLinkedService);
Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));

DataSet

Créer un DataSet SQL

DatasetResource sqlDataset = new DatasetResource(
	new SqlServerTableDataset
	{
		LinkedServiceName = new LinkedServiceReference
		{
			ReferenceName = SQLName
		}, TableName = tableName
	}
	, name:dSName
	);
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, dSName, sqlDataset);
Console.WriteLine(SafeJsonConvert.SerializeObject(sqlDataset, client.SerializationSettings));

Créer un DataSet Azure Data Lake Storage

Console.WriteLine("Creating or updating DataSet ADLS [" + dsName + "] ...");
DatasetResource AdlsDataset = new DatasetResource(
	new AzureDataLakeStoreDataset
	{
		LinkedServiceName = new LinkedServiceReference
		{
			ReferenceName = ADLSName
		}
		, FileName = cleanFileName + ".gz"
		, FolderPath = ADLSStagingFolder
		,
		Format = new TextFormat
		{
			ColumnDelimiter = ";"
			,NullValue = ""
		}
		,Compression = new DatasetGZipCompression
		{
			Level = "Optimal"
		}
		
		
	}
	,name: dsName
	);
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, dsName, AdlsDataset);
Console.WriteLine(SafeJsonConvert.SerializeObject(AdlsDataset, client.SerializationSettings));

Activités

Créer une activité de copie

new CopyActivity
    {
        Name = "Copy-" + dataSetSQLRessource[i].Name
                ,
        Inputs = new List<DatasetReference>
                {
                    new DatasetReference()
                    {
                        ReferenceName = dataSetSQLRessource[i].Name
                    }
                }
                ,
        Outputs = new List<DatasetReference>
                {
                    new DatasetReference()
                    {
                        ReferenceName = dataSetADLSRessource[i].Name

                    }
                }
                ,
        Source = new SqlSource
        {

            SqlReaderQuery = new Expression { Value = "select * from "+ dataSetSQLRessourceTable.TableName+ " WHERE DATEDIFF(day,DateModification,DATEADD(DAY,@{pipeline().parameters.nbDaysReprise},GETDATE())) < 0" }
        }
                ,
        Sink = new AzureDataLakeStoreSink
        {
            WriteBatchSize = 0
                    ,
            WriteBatchTimeout = "00:00:00"
        }

    }

Créer une activité Http

string body = "{\"username\": \"" + powerBIAccount + "\",\"password\": \"" +<span data-mce-type="bookmark" style="display: inline-block; width: 0px; overflow: hidden; line-height: 0;" class="mce_SELRES_start"></span> powerBIPassword + "\",\"authorityUrl\": \"" + authorityUrl + "\",\"resourceUrl\": \"" + resourceUrl + "\",\"apiUrl\": \"" + apiUrl + "\",\"clientId\": \"" + clientId + "\",\"groupId\": \"" + groupId + "\"}";
new WebActivity()
	{
		Name = "RefreshPowerBI"
		,Url = powerBIRefreshUrl
		,Method = "Post"
		,Body = body
		,DependsOn = dependencies
}

Créer une activité U-SQL

new DataLakeAnalyticsUSQLActivity()
	{
		Name = "Usql-"+ job

	, DegreeOfParallelism = DegreeOfParallelism
	, ScriptLinkedService = new LinkedServiceReference
	{
		ReferenceName = uSQLScriptBlobName
	}
	, ScriptPath = uSQLScriptPath + '/' + job
	,LinkedServiceName = new LinkedServiceReference
		{
			ReferenceName = ADLAName
		}
	,DependsOn = dependencies
	,Parameters = new Dictionary&amp;lt;string,object&amp;gt;
		{
				{"directoryCol", new Expression { Value = "@{pipeline().parameters.directoryCol}" } }
				,{"directoryDw", new Expression { Value = "@{pipeline().parameters.directoryDw}" } }
		}
	}

Créer un Pipeline

PipelineResource pipeline = new PipelineResource
{
	Parameters = new Dictionary<string, ParameterSpecification>
					{
						{"nbDaysReprise",new ParameterSpecification {Type = ParameterType.String}}
						,{"directoryCol",new ParameterSpecification {Type = ParameterType.String}}
						,{"directoryDw",new ParameterSpecification {Type = ParameterType.String}}
					},
	Activities = AllActivities

};

Créer un déclencheur

Console.WriteLine("Creating scheduled trigger, starting at " + startTime.ToString() + " running each "+frequency);
            
TriggerResource scheduleTrigger = new TriggerResource(
	new ScheduleTrigger
	{
		Pipelines = pipelines
		, Recurrence = new ScheduleTriggerRecurrence
		{
			StartTime = startTime
			, Frequency = frequency
			, Interval = 1
		}
		
		
	}
	, name: triggerName
	);
client.Triggers.CreateOrUpdate(ressourceGroup, factoryName, triggerName, scheduleTrigger);
Console.WriteLine(SafeJsonConvert.SerializeObject(scheduleTrigger, client.SerializationSettings));

client.Triggers.BeginStart(ressourceGroup, factoryName, triggerName);

L’objectif de cet article est principalement de rappeler la syntaxe de création des différents objets Azure Data Factory, et d’éviter d’avoir à m’arracher les cheveux à nouveau devant le manque d’exemple de la MSDN durant la preview d’ADF V2.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>