bitter-lawyer-49179
02/17/2023, 8:24 AMdatahub_rest_default
while using datahub_kafka
connection type. Obviously this caused issues while emitting metadata towards the end of my job. I noticed this and corrected the connection id to datahub_kafka_default
. However, either airflow or the datahub plugin is not picking up on this change and is continuously looking for datahub_rest_default
connection id instead of the corrected one.
I deleted the connection and set it up again from scratch, but no luck.
[2023-02-17, 08:12:03 UTC] {{logging_mixin.py:109}} INFO - Exception: Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.7/site-packages/datahub_provider/_plugin.py", line 284, in custom_on_success_callback
datahub_task_status_callback(context, status=InstanceRunResult.SUCCESS)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/datahub_provider/_plugin.py", line 136, in datahub_task_status_callback
DatahubGenericHook(context["_datahub_config"].datahub_conn_id)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/datahub_provider/hooks/datahub.py", line 189, in get_underlying_hook
conn = self.get_connection(self.datahub_conn_id)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/hooks/base.py", line 68, in get_connection
conn = Connection.get_connection_from_secrets(conn_id)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/models/connection.py", line 410, in get_connection_from_secrets
raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
airflow.exceptions.AirflowNotFoundException: The conn_id `datahub_rest_default` isn't defined
Any help with this much appreciated.
PS: I tested this whole setup on a local instance of airflow first before I set it up on AWS MWAA. I did not face this issue on my local setup. It picked up kafka default connection and emitted metadata. (But I did not make the mistakes I did while configuring the connection id in my local)powerful-telephone-2424
02/17/2023, 9:40 AM{snowflake.connector.CArrowIterator:372} - [Snowflake Exception] unknown arrow internal data type(0) for TIMESTAMP_LTZ data
Weirdly it was working before this and then started failing on a second run a few mins later. Can anyone help me understand this issue?
More detailed log:
> datahub ingest -c ../../zwa-snowflake.recipe.yaml
.
.
.
[2023-02-17 01:35:46,224] INFO {datahub.cli.ingest_cli:120} - Starting metadata ingestion
\[2023-02-17 01:35:47,083] INFO {datahub.ingestion.source.snowflake.snowflake_v2:1389} - Checking current version
[2023-02-17 01:35:47,247] INFO {datahub.ingestion.source.snowflake.snowflake_v2:1395} - Checking current role
-[2023-02-17 01:35:47,328] INFO {datahub.ingestion.source.snowflake.snowflake_v2:1401} - Checking current warehouse
[2023-02-17 01:35:47,424] INFO {datahub.ingestion.source.snowflake.snowflake_v2:1408} - Checking current edition
|[2023-02-17 01:35:49,021] ERROR {snowflake.connector.CArrowIterator:372} - [Snowflake Exception] unknown arrow internal data type(0) for TIMESTAMP_LTZ data
\[2023-02-17 01:35:49,032] INFO {datahub.ingestion.source.snowflake.snowflake_v2:627} - The role REVEFI_ROLE has `MANAGE GRANTS` privilege. This is not advisable and also not required.
/[2023-02-17 01:35:49,565] ERROR {snowflake.connector.CArrowIterator:372} - [Snowflake Exception] unknown arrow internal data type(0) for TIMESTAMP_LTZ data
[2023-02-17 01:35:49,566] INFO {datahub.ingestion.source.snowflake.snowflake_v2:627} - The role REVEFI_ROLE has `MANAGE GRANTS` privilege. This is not advisable and also not required.
[2023-02-17 01:35:49,566] ERROR {datahub.ingestion.source.snowflake.snowflake_v2:224} - permission-error => No databases found. Please check permissions.
[2023-02-17 01:35:49,763] INFO {datahub.cli.ingest_cli:133} - Finished metadata ingestion
rich-pager-68736
02/17/2023, 9:44 AM'failures': [{'error': 'Unable to emit metadata to DataHub GMS',
'info': {'message': '413 Client Error: Request Entity Too Large for url: '
'<https://xxxxxxxxx/api/gms/aspects?action=ingestProposal>',
'id': 'urn:li:dataset:(urn:li:dataPlatform:snowflake,YYYYYYYYYYYYYYYY,PROD)'}},
{'error': 'Unable to emit metadata to DataHub GMS',
'info': {'message': '413 Client Error: Request Entity Too Large for url: '
'<https://xxxxxxxxx/api/gms/aspects?action=ingestProposal>',
'id': 'urn:li:dataset:(urn:li:dataPlatform:snowflake,YYYYYYYYYYYYYYYY,PROD)'}},
bitter-lawyer-49179
02/17/2023, 11:01 AMrhythmic-ram-79027
02/17/2023, 1:03 PMglamorous-elephant-17130
02/17/2023, 1:14 PMelegant-salesmen-99143
02/17/2023, 3:54 PMsink:
type: datahub-rest
config:
server: '<https://datahub-stage.XXX.XX/gms>'
token: '<[my token]!>'
But I still get autharization error in ingest logs:
'[2023-02-17 14:50:02,607] INFO {datahub.ingestion.run.pipeline:196} - Source configured successfully.\n'
'[2023-02-17 14:50:02,609] INFO {datahub.cli.ingest_cli:120} - Starting metadata ingestion\n'
'[2023-02-17 14:50:02,724] ERROR {datahub.ingestion.run.pipeline:62} - failed to write record with workunit '
"urn:li:container:ba0a76bc8027a651a2a4c28d02d447d8-containerProperties with ('Unable to emit metadata to DataHub GMS', {'message': '401 "
"Client Error: Unauthorized for url: <https://datahub-stage.XXX.XX/gms/aspects?action=ingestProposal>', 'id': "
"'urn:li:container:ba0a76bc8027a651a2a4c28d02d447d8'}) and info {'message': '401 Client Error: Unauthorized for url: "
"<https://datahub-stage.XXX.XX/gms/aspects?action=ingestProposal>', 'id': 'urn:li:container:ba0a76bc8027a651a2a4c28d02d447d8'}\n"
'[2023-02-17 14:50:02,730] ERROR {datahub.ingestion.run.pipeline:62} - failed to write record with workunit '
"urn:li:container:ba0a76bc8027a651a2a4c28d02d447d8-status with ('Unable to emit metadata to DataHub GMS', {'message': '401 Client Error: "
"Unauthorized for url: <https://datahub-stage.XXX.XX/gms/aspects?action=ingestProposal>', 'id': "
"'urn:li:container:ba0a76bc8027a651a2a4c28d02d447d8'}) and info {'message': '401 Client Error: Unauthorized for url: "
What am I doing wrong?
PS. We're on 0.9.6.1 if it's relevantadorable-river-99503
02/17/2023, 7:36 PMeager-lunch-3897
02/17/2023, 8:36 PMdatahubUpgrade.enabled: false
? It defaults to true and we are running into an issue with the auth-secrets not existing yet and the job never running due to the pre-install helm hooks. Any ideas?glamorous-elephant-17130
02/18/2023, 3:45 PMbitter-lawyer-49179
02/18/2023, 4:36 PMlineage_backend_demo.py
DAG, I dont see the airflow task & icon in between my upstream table (inlet) and the downstream (outlet). I thought that i'd see Airflow as a platform and see it as part of my lineage (see the lineage with redshift tables).gentle-lifeguard-88494
02/18/2023, 4:39 PMbest-wire-59738
02/20/2023, 3:47 AMcreamy-tent-10151
02/20/2023, 5:06 AMflat-painter-78331
02/20/2023, 7:42 AMAuthenticatorChain:80 - Authentication chain failed to resolve a valid authentication. Errors: [(com.datahub.authentication.authenticator.DataHubSystemAuthenticator,Failed to authenticate inbound request: Provided credentials do not match known system client id & client secret. Check your configuration values...), (com.datahub.authentication.authenticator.DataHubTokenAuthenticator,Failed to authenticate inbound request: Authorization header missing 'Bearer' prefix.)]
! @80jbm284n - Internal server error, for (POST) [/logIn] ->
2/17/2023 3:08:25 PM
2/17/2023 3:08:25 PM play.api.UnexpectedException: Unexpected exception[RuntimeException: Failed to generate session token for user]
2/17/2023 3:08:25 PM at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:358)
2/17/2023 3:08:25 PM at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:264)
2/17/2023 3:08:25 PM at play.core.server.AkkaHttpServer$$anonfun$2.applyOrElse(AkkaHttpServer.scala:436)
2/17/2023 3:08:25 PM at play.core.server.AkkaHttpServer$$anonfun$2.applyOrElse(AkkaHttpServer.scala:428)
2/17/2023 3:08:25 PM at scala.concurrent.Future.$anonfun$recoverWith$1(Future.scala:417)
2/17/2023 3:08:25 PM at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41)
2/17/2023 3:08:25 PM at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
2/17/2023 3:08:25 PM at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
2/17/2023 3:08:25 PM at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
2/17/2023 3:08:25 PM at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
2/17/2023 3:08:25 PM at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
2/17/2023 3:08:25 PM at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
2/17/2023 3:08:25 PM at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
2/17/2023 3:08:25 PM at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
2/17/2023 3:08:25 PM at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
2/17/2023 3:08:25 PM at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
2/17/2023 3:08:25 PM at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
2/17/2023 3:08:25 PM at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
2/17/2023 3:08:25 PM at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
2/17/2023 3:08:25 PM Caused by: java.lang.RuntimeException: Failed to generate session token for user
2/17/2023 3:08:25 PM at client.AuthServiceClient.generateSessionTokenForUser(AuthServiceClient.java:101)
2/17/2023 3:08:25 PM at controllers.AuthenticationController.logIn(AuthenticationController.java:182)
2/17/2023 3:08:25 PM at router.Routes$$anonfun$routes$1.$anonfun$applyOrElse$17(Routes.scala:581)
2/17/2023 3:08:25 PM at play.core.routing.HandlerInvokerFactory$$anon$8.resultCall(HandlerInvoker.scala:150)
2/17/2023 3:08:25 PM at play.core.routing.HandlerInvokerFactory$$anon$8.resultCall(HandlerInvoker.scala:149)
2/17/2023 3:08:25 PM at play.core.routing.HandlerInvokerFactory$JavaActionInvokerFactory$$anon$3$$anon$4$$anon$5.invocation(HandlerInvoker.scala:115)
2/17/2023 3:08:25 PM at play.core.j.JavaAction$$anon$1.call(JavaAction.scala:119)
2/17/2023 3:08:25 PM at play.http.DefaultActionCreator$1.call(DefaultActionCreator.java:33)
2/17/2023 3:08:25 PM at play.core.j.JavaAction.$anonfun$apply$8(JavaAction.scala:175)
2/17/2023 3:08:25 PM at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
2/17/2023 3:08:25 PM at scala.util.Success.$anonfun$map$1(Try.scala:255)
2/17/2023 3:08:25 PM at scala.util.Success.map(Try.scala:213)
2/17/2023 3:08:25 PM at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
2/17/2023 3:08:25 PM at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
2/17/2023 3:08:25 PM at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
2/17/2023 3:08:25 PM at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
2/17/2023 3:08:25 PM at play.core.j.HttpExecutionContext.$anonfun$execute$1(HttpExecutionContext.scala:64)
2/17/2023 3:08:25 PM at play.api.libs.streams.Execution$trampoline$.execute(Execution.scala:70)
2/17/2023 3:08:25 PM at play.core.j.HttpExecutionContext.execute(HttpExecutionContext.scala:59)
2/17/2023 3:08:25 PM at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
2/17/2023 3:08:25 PM at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete(Promise.scala:372)
2/17/2023 3:08:25 PM at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete$(Promise.scala:371)
2/17/2023 3:08:25 PM at scala.concurrent.impl.Promise$KeptPromise$Successful.onComplete(Promise.scala:379)
2/17/2023 3:08:25 PM at scala.concurrent.impl.Promise.transform(Promise.scala:33)
2/17/2023 3:08:25 PM at scala.concurrent.impl.Promise.transform$(Promise.scala:31)
2/17/2023 3:08:25 PM at scala.concurrent.impl.Promise$KeptPromise$Successful.transform(Promise.scala:379)
2/17/2023 3:08:25 PM at scala.concurrent.Future.map(Future.scala:292)
2/17/2023 3:08:25 PM at scala.concurrent.Future.map$(Future.scala:292)
2/17/2023 3:08:25 PM at scala.concurrent.impl.Promise$KeptPromise$Successful.map(Promise.scala:379)
2/17/2023 3:08:25 PM at scala.concurrent.Future$.apply(Future.scala:659)
2/17/2023 3:08:25 PM at play.core.j.JavaAction.apply(JavaAction.scala:176)
2/17/2023 3:08:25 PM at play.api.mvc.Action.$anonfun$apply$4(Action.scala:82)
2/17/2023 3:08:25 PM at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307)
2/17/2023 3:08:25 PM ... 14 common frames omitted
2/17/2023 3:08:25 PM Caused by: java.lang.RuntimeException: Bad response from the Metadata Service: HTTP/1.1 401 Unauthorized ResponseEntityProxy{[Content-Type: text/html;charset=iso-8859-1,Content-Length: 567,Chunked: false]}
2/17/2023 3:08:25 PM at client.AuthServiceClient.generateSessionTokenForUser(AuthServiceClient.java:97)
2/17/2023 3:08:25 PM ... 46 common frames omitted
Can anyone help me figure out what i should do here?
Thanksacceptable-rain-30599
02/20/2023, 8:17 AMpowerful-cat-68806
02/20/2023, 9:15 AM404
error when trying to access it publicly. I’ve recreated the ingress controller & re-assign R53 to the new endpoint. Same
What’s the RC here? I presume the blocker is between the ingress endpoint & the app itselfrich-policeman-92383
02/20/2023, 10:08 AMdatahub delete --hard -f -p "spark"
datahub delete --hard -f -p "bigquery"
alert-printer-93847
02/20/2023, 10:30 AMwide-afternoon-79955
02/20/2023, 2:41 PMInvite Users
and disable User login using username and password. I just wanted OKTA authenticated users. Is it possible ?
I tried disabling JAAS auth by setting AUTH_JAAS_ENABLED=false
for datahub-frontend
but its not working.alert-traffic-45034
02/20/2023, 5:07 PMValidation error (FieldUndefined@[searchResultFields/datasetProfiles/sizeInBytes]) : Field 'sizeInBytes' in type 'DatasetProfile' is undefined (code undefined)
thanksambitious-shoe-92590
02/21/2023, 1:33 AMStruct
type with "pull" based ingestion. I have a nested field called data
which contains a number of child key:values. When I ingest this data with datahub, the outputted dataset will have the data field, but it is un-expandable.
I've read into Field Paths and the differences between v1 and v2 paths, but I am a bit confused on how to actually get to the point of being able to "expand" the nested struct. Seems like emitters are used in some examples but from my understanding that is if you want to manually add fields to the schema?
Any help would be appreciated, the data is coming from a S3 source if that makes a difference.gray-ocean-32209
02/21/2023, 8:09 AM[2023-02-21, 07:13:46 UTC] {_plugin.py:179} INFO - Emitted Start Datahub Dataprocess Instance: DataProcessInstance(id='redshift_lineage_emission_dag_emit_lineage_manual__2023-02-21T07:13:31.971338+00:00', urn=<datahub.utilities.urns.data_process_instance_urn.DataProcessInstanceUrn object at 0xffff9a989ac0>, orchestrator='***', cluster='prod', type='BATCH_AD_HOC', template_urn=<datahub.utilities.urns.data_job_urn.DataJobUrn object at 0xffff864bb790>, parent_instance=None, properties={'run_id': 'manual__2023-02-21T07:13:31.971338+00:00', 'duration': '0.375603', 'start_date': '2023-02-21 07:13:45.274840+00:00', 'end_date': '2023-02-21 07:13:45.650443+00:00', 'execution_date': '2023-02-21 07:13:31.971338+00:00', 'try_number': '1', 'hostname': '05badb3885d0', 'max_tries': '1', 'external_executor_id': 'None', 'pid': '3840758', 'state': 'success', 'operator': 'DatahubEmitterOperator', 'priority_weight': '1', 'unixname': 'default', 'log_url': '<http://localhost:8080/log?execution_date=2023-02-21T07%3A13%3A31.971338%2B00%3A00&task_id=emit_lineage&dag_id=redshift_lineage_emission_dag&map_index=-1>'}, url='<http://localhost:8080/log?execution_date=2023-02-21T07%3A13%3A31.971338%2B00%3A00&task_id=emit_lineage&dag_id=redshift_lineage_emission_dag&map_index=-1>', inlets=[], outlets=[], upstream_urns=[])
It works with BashOperator with setting inlets and outlets, we can see lineage in datahub
lineage_emission_dag.py
"""Lineage Emission
This example demonstrates how to emit lineage to DataHub within an Airflow DAG.
"""
from datetime import timedelta
import datahub.emitter.mce_builder as builder
from airflow import DAG
#from airflow.providers.amazon.aws.operators.redshift import RedshiftSQLOperator
from contrib.operators.redshift_sql_operator import RedshiftSQLOperator
from airflow.utils.dates import days_ago
from datahub_provider.operators.datahub import DatahubEmitterOperator
global_vars = {
'REDSHIFT_CONN_ID': 'redshift_prod'
}
default_args = {
"owner": "de",
"depends_on_past": False,
"email": ["<mailto:de-team@zynga.com|de-team@zynga.com>"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=120),
}
with DAG(
"lineage_emission_dag",
default_args=default_args,
description="An example DAG demonstrating lineage emission within an Airflow DAG.",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
catchup=False,
) as dag:
# This example shows a RedshiftSQLOperator followed by a lineage emission with DatahubEmitterOperator
sql = """select count(*) as tables
from information_schema.tables
where table_type = 'BASE TABLE'"""
redshift_transformation_task = RedshiftSQLOperator(
task_id="redshift_transformation",
dag=dag,
redshift_conn_id=global_vars['REDSHIFT_CONN_ID'],
sql=sql
)
# same DatahubEmitterOperator can be used to emit lineage in any context.
datahub_emit_lineage_task = DatahubEmitterOperator(
task_id="emit_lineage",
datahub_conn_id="datahub_rest_default",
mces=[
builder.make_lineage_mce(
upstream_urns=[
builder.make_dataset_urn("redshift", "warehouse.zbatch.tableA"),
builder.make_dataset_urn("redshift", "warehouse.zbatch.tableB"),
],
downstream_urn=builder.make_dataset_urn(
"redshift", "warehouse.zbatch.tableC"
),
)
]
)
redshift_transformation_task >> datahub_emit_lineage_task
It looks DatahubEmitterOperator is not working with latest release
our setup Datahub version 0.9.6
acryl-datahub 0.9.6
acryl-datahub-airflow-plugin 0.9.6
Airflow v2.3.4gray-ocean-32209
02/21/2023, 8:14 AMrough-car-65301
02/21/2023, 2:21 PMImportError: cannot import name 'make_avsc_object' from 'avro.schema'
rough-car-65301
02/21/2023, 2:22 PMacceptable-nest-20465
02/21/2023, 4:17 PMalert-traffic-45034
02/21/2023, 6:10 PMdatahub*
index in the opensearch, the front page like domains
, platforms
, tog tags
,etc , still exist.
I am not sure where these info. got cache / persisted.
thanks for guidance in advancefierce-electrician-85924
02/22/2023, 10:16 AMfineGrainedLineages = [
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("dataset1", "column1")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("dataset2", "column2")],
)
]
dataJobInputOutput = DataJobInputOutputClass(
inputDatasets=[datasetUrn("dataset1")],
outputDatasets=[datasetUrn("dataset2")],
inputDatajobs=None,
inputDatasetFields=[
fldUrn("dataset1", "column1"),
],
outputDatasetFields=[
fldUrn("dataset2", "column2")
],
fineGrainedLineages=fineGrainedLineages,
)
dataJobLineageMcp = MetadataChangeProposalWrapper(
entityUrn=builder.make_data_job_urn("spark", "data-flow", "datajob1", "DEV"),
aspect=dataJobInputOutput,
changeType=ChangeTypeClass.UPSERT,
entityType='datajob',
aspectName='dataJobInputOutput'
)
# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("<http://localhost:8080>")
# Emit metadata!
result = emitter.emit_mcp(dataJobLineageMcp)
but it's not showing lineage on UI, what could be the reason behind it? (I am using version 0.9.2)big-fireman-42102
02/22/2023, 12:20 PMdatahub docker quickstart
in the terminal and this error shows up Error response from daemon: driver failed programming external connectivity on endpoint datahub-gms (c7259513a4f59fb2e360eab2377b0ee38fce02b0f111b69e57eb22ad843d2411): Bind for 0.0.0.0:8080 failed: port is already allocated
. I restarted the docker but it still same error. i am using mac M1. Does anyone have any idea why and how can fix it? Thanks in advance🙂