Hey guys, we are currently setting up Airflow with...
# troubleshoot
c
Hey guys, we are currently setting up Airflow with Datahub. This works really great but it seems like macros are not resolved. Does anyone know how to do so? Example (result see attachment):
Copy code
s3_sensor = S3KeySensor(
  task_id="s3_file_check",
  aws_conn_id="aws_prod",
  bucket_key=bronze_path,
  bucket_name=bronze_bucket,
  poke_interval=60,
  mode="reschedule",
  timeout=60 * 60 * 8,
  dag=dag,
  inlets=[Dataset("s3", "test/{{ ds }}")],
)
Reason is that I would like to connect the actual file on S3 with the Airflow run. Thanks a lot for any suggestion 🙂 PS: I am using the following versions: apache-airflow-providers-amazon==4.1.0 acryl-datahub-airflow-plugin==0.8.41.2
plus1 1
d
Currently, templates are not resolved in inlets, so you have to resolve before you pass in.
c
Ah, thanks for the reply! 🙂 Solved it with this hint using own operators/sensors. For all who have the same problem, this is one example:
Copy code
class CustomS3KeySensor(S3KeySensor):
    def poke(self, context: 'Context'):
        self.inlets = [Dataset("s3", f"{self.bucket_name}/{self.bucket_key[0]}")] # define inlets in Sensors/operators
        return super().poke(context)
d
awesome, thanks for getting back with this nice solution
w
@cold-autumn-7250 with the code you provided, are you using a python callable to populate the output of the CustomS3KeySensor.poke function? Or is this your task, and you simply added the task_id and config directly here?