edit-icon download-icon

Configure log streaming

Last Updated: Jun 29, 2018

DI-On-Flume is a data collection, aggregation, and delivery engine that schedules a streamlined DataX task (Channel-> Writer) to a Flume execution context relying on Flume’s Source, Channel, and Sink programming frameworks.

Based on DI-On-Flume, you can collect heterogeneous data using the Flume-backed Source and deliver it to the target end by using the DI Writer plug-in (Flume’s official Sink also works). DI-On-Flume is neither a rejection of Flume nor a rejection of DI, but an integrated solution that combines the advantages of both Flume and DI.

The logical architecture of DI-On-Flume is shown in the following figure.

1

Basic process of configuring a log stream task

  1. Prepare a log file required in the added resource group machine and grant it the read and write permissions.

  2. Add a scheduling resource, and configure the machine to auto install all software after server initialization.、

  3. Configure a log stream task.

  4. View the synchronization log result in DataHub.

Procedure

Add a scheduling resource

  1. Log on to the Data Integration page, select Resource > Resource, as shown in the following figure.

    Resources

  2. In the resource management page, click additional resources group in the upper-right corner and enter a resource group name, as shown in the following figure.

    ResourceGroup

  3. Click Next to configure the server. See the following figure.

    AddServer

  4. Click Server init to initialize the server.

    ServerInitial

    Note: If the initialization fails, try to restart alisa with the following command: /home/admin/alisatasknode/target/alisatasknode/bin/serverctl restart.

Configure a script task

  • Configure a file-channel log stream job.
    1. {
    2. "type": "stream",
    3. "version": "1.0",
    4. "configuration": {
    5. "setting": {
    6. "speed": {
    7. "channel": 100//Concurrency
    8. },
    9. "errorLimit": {
    10. "record": "0"//Number of error records
    11. },
    12. "channel": {
    13. "agent.channels": "file",//File channel
    14. "agent.channels.file.checkpointDir": "",//The directory where checkpoint files are stored
    15. "agent.channels.file.dataDirs": "",//The directory where the data is stored
    16. "agent.channels.file.capacity": "100000",//Maximum channel capacity
    17. "agent.channels.file.transactionCapacity": "100000",//Maximum transaction capacity
    18. "agent.channels.file.byteCapacity": "2000000",//Maximum channel capacity
    19. "agent.channels.file.type": "file"//Type name
    20. }
    21. },
    22. "reader": {
    23. "plugin": "flume",
    24. "parameter": {
    25. "agent.sources": "taildir",//Source name
    26. "agent.sources.taildir.type": "TAILDIR",//Type name
    27. "agent.sources.taildir.channels": "file",//File channel type
    28. "agent.sources.taildir.filegroups": "f1",//File group name generated in the directory
    29. "agent.sources.taildir.filegroups.f1": "/home/f1/log.txt",//File path in the file group generated in the directory
    30. "agent.sources.taildir.headers.f1.headerKey1": "value1",//The title used for adding the absolute path file name to an event
    31. "agent.sources.taildir.fileHeader": "true"//Whether to add a header file that stores the absolute path file name
    32. }
    33. },
    34. "writer": {
    35. "plugin": "datahubwriter",
    36. "parameter": {
    37. "agent.sinks": "dataXSinkWrapper",//Sink name
    38. "agent.sinks.dataXSinkWrapper.channel": "file",//Corresponding to the preceding channel type
    39. "agent.sinks.dataXSinkWrapper.type":"xxxxx.DataXSinkWrapper",//Type path
    40. "endpoint": "",//Endpoint information in DataHub
    41. "accessId": "",
    42. "accessKey": "",//AK information
    43. "project": "",//Project information in DataHub
    44. "topic": ""//Topic information in DataHub
    45. "maxRetryCount": 500//Number of retries
    46. }
    47. }
    48. }
    49. }
  • Configure a memory-channel log stream job:

    1. {
    2. "configuration": {
    3. "reader": {
    4. "parameter": {
    5. "agent.sources": "taildir",//Source name
    6. "agent.sources.taildir.channels": "memoryChannel",//Memory channel type
    7. "agent.sources.taildir.fileHeader": "true",////Whether to add a header file for storing the absolute path file name
    8. "agent.sources.taildir.filegroups": "lzz",//File group name generated in the directory
    9. "agent.sources.taildir.filegroups.lzz": "/home/wb-lzz242732/lzz/0",//File path in the file group generated in the directory
    10. "agent.sources.taildir.type": "TAILDIR"//Type name
    11. },
    12. "plugin": "flume"
    13. },
    14. "setting": {
    15. "channel": {
    16. "agent.channels": "memoryChannel",//Memory channel
    17. "agent.channels.memoryChannel.byteCapacity": "800000",//Maximum channel capacity
    18. "agent.channels.memoryChannel.capacity": "10000",////Maximum capacity of the channel
    19. "agent.channels.memoryChannel.transactionCapacity": "10000",//Maximum transaction capacity
    20. "agent.channels.memoryChannel.type": "memory"//Type name
    21. },
    22. "errorLimit": {
    23. "record": "0"//Number of errors
    24. },
    25. "speed": {
    26. "byte": 1048576//Speed
    27. }
    28. },
    29. "writer": {
    30. "parameter": {
    31. "agent.sinks": "dataXSinkWrapper",//Sink name
    32. "agent.sinks.dataXSinkWrapper.channel": "memoryChannel",//Memory channel "agent.sinks.dataXSinkWrapper.type":"xxxxx.DataXSinkWrapper",//Type path
    33. "accessId": "",
    34. "accessKey": "",//AK information
    35. "endpoint": "",//DataHub endpoint
    36. "maxRetryCount": 500,//Number of retries
    37. "project": "xxx",//Project created in DataHub
    38. "topic": "yyy"//Topic created in DataHub
    39. },
    40. "plugin": "datahubwriter"
    41. }
    42. },
    43. "type": "stream",
    44. "version": "1.0"
    45. }

View the synchronization log result in DataHub

  1. Log on to the DataHub console, and click Create Project to create a project.

  2. Select View > Create Topic in the upper-right corner to create a topic.

  3. Complete the configuration in the Create Topic page, and click Create.

    You can create a topic in following two ways: creating a table directly, or importing the MaxCompute table structure.

  4. You can view the synchronization log result after creating a table.

Tip: The Real-time Log feature is still in beta testing. Open a ticket to our Data Integration team for trial use.

Thank you! We've received your feedback.